nautilus_common/logging/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The logging framework for Nautilus systems.
17//!
18//! This module implements a high-performance logging subsystem that operates in a separate thread
19//! using an MPSC channel for log message delivery. The system uses reference counting to track
20//! active `LogGuard` instances, ensuring the logging thread completes all pending writes before
21//! termination.
22//!
23//! # LogGuard Reference Counting
24//!
25//! The logging system maintains a global count of active `LogGuard` instances using an atomic
26//! counter (`LOGGING_GUARDS_ACTIVE`). When a `LogGuard` is created, the counter is incremented,
27//! and when dropped, it's decremented. When the last `LogGuard` is dropped (counter reaches zero),
28//! the logging thread is properly joined to ensure all buffered log messages are written to their
29//! destinations before the process terminates.
30//!
31//! The system supports a maximum of 255 concurrent `LogGuard` instances. Attempting to create
32//! more will cause a panic.
33
34pub mod headers;
35pub mod logger;
36pub mod macros;
37pub mod writer;
38
39use std::{
40    collections::HashMap,
41    env,
42    str::FromStr,
43    sync::atomic::{AtomicBool, AtomicU8, Ordering},
44};
45
46use log::LevelFilter;
47// Re-exports
48pub use macros::{log_debug, log_error, log_info, log_trace, log_warn};
49use nautilus_core::{UUID4, time::get_atomic_clock_static};
50use nautilus_model::identifiers::TraderId;
51use tracing_subscriber::EnvFilter;
52use ustr::Ustr;
53
54use self::{
55    logger::{LogGuard, Logger, LoggerConfig},
56    writer::FileWriterConfig,
57};
58use crate::enums::LogLevel;
59
60pub const RECV: &str = "<--";
61pub const SEND: &str = "-->";
62pub const CMD: &str = "[CMD]";
63pub const EVT: &str = "[EVT]";
64pub const DOC: &str = "[DOC]";
65pub const RPT: &str = "[RPT]";
66pub const REQ: &str = "[REQ]";
67pub const RES: &str = "[RES]";
68
69static LOGGING_INITIALIZED: AtomicBool = AtomicBool::new(false);
70static LOGGING_BYPASSED: AtomicBool = AtomicBool::new(false);
71static LOGGING_REALTIME: AtomicBool = AtomicBool::new(true);
72static LOGGING_COLORED: AtomicBool = AtomicBool::new(true);
73static LOGGING_GUARDS_ACTIVE: AtomicU8 = AtomicU8::new(0);
74
75/// Returns whether the core logger is enabled.
76pub fn logging_is_initialized() -> bool {
77    LOGGING_INITIALIZED.load(Ordering::Relaxed)
78}
79
80/// Sets the logging subsystem to bypass mode.
81pub fn logging_set_bypass() {
82    LOGGING_BYPASSED.store(true, Ordering::Relaxed);
83}
84
85/// Shuts down the logging subsystem.
86pub fn logging_shutdown() {
87    // Perform a graceful shutdown: prevent new logs, signal Close, drain and join.
88    // Delegates to logger implementation which has access to the internals.
89    crate::logging::logger::shutdown_graceful();
90}
91
92/// Returns whether the core logger is using ANSI colors.
93pub fn logging_is_colored() -> bool {
94    LOGGING_COLORED.load(Ordering::Relaxed)
95}
96
97/// Sets the global logging clock to real-time mode.
98pub fn logging_clock_set_realtime_mode() {
99    LOGGING_REALTIME.store(true, Ordering::Relaxed);
100}
101
102/// Sets the global logging clock to static mode.
103pub fn logging_clock_set_static_mode() {
104    LOGGING_REALTIME.store(false, Ordering::Relaxed);
105}
106
107/// Sets the global logging clock static time with the given UNIX timestamp (nanoseconds).
108pub fn logging_clock_set_static_time(time_ns: u64) {
109    let clock = get_atomic_clock_static();
110    clock.set_time(time_ns.into());
111}
112
113/// Initialize tracing.
114///
115/// Tracing is meant to be used to trace/debug async Rust code. It can be
116/// configured to filter modules and write up to a specific level by passing
117/// a configuration using the `RUST_LOG` environment variable.
118///
119/// # Safety
120///
121/// Should only be called once during an applications run, ideally at the
122/// beginning of the run.
123///
124/// # Errors
125///
126/// Returns an error if tracing subscriber fails to initialize.
127pub fn init_tracing() -> anyhow::Result<()> {
128    // Skip tracing initialization if `RUST_LOG` is not set
129    if let Ok(v) = env::var("RUST_LOG") {
130        let env_filter = EnvFilter::new(v.clone());
131
132        if tracing_subscriber::fmt()
133            .with_env_filter(env_filter)
134            .try_init()
135            .is_ok()
136        {
137            println!("Initialized tracing logs with RUST_LOG={v}");
138        }
139    }
140    Ok(())
141}
142
143/// Initialize logging.
144///
145/// Logging should be used for Python and sync Rust logic which is most of
146/// the components in the [nautilus_trader](https://pypi.org/project/nautilus_trader) package.
147/// Logging can be configured to filter components and write up to a specific level only
148/// by passing a configuration using the `NAUTILUS_LOG` environment variable.
149///
150/// # Safety
151///
152/// Should only be called once during an applications run, ideally at the
153/// beginning of the run.
154///
155/// Logging should be used for Python and sync Rust logic which is most of
156/// the components in the `nautilus_trader` package.
157/// Logging can be configured via the `NAUTILUS_LOG` environment variable.
158///
159/// # Errors
160///
161/// Returns an error if the logging subsystem fails to initialize.
162pub fn init_logging(
163    trader_id: TraderId,
164    instance_id: UUID4,
165    config: LoggerConfig,
166    file_config: FileWriterConfig,
167) -> anyhow::Result<LogGuard> {
168    // Only set these after successful initialization
169    let is_colored = config.is_colored;
170    let guard = Logger::init_with_config(trader_id, instance_id, config, file_config)?;
171
172    // Set flags only after successful initialization
173    LOGGING_INITIALIZED.store(true, Ordering::Relaxed);
174    LOGGING_COLORED.store(is_colored, Ordering::Relaxed);
175
176    Ok(guard)
177}
178
179#[must_use]
180pub const fn map_log_level_to_filter(log_level: LogLevel) -> LevelFilter {
181    match log_level {
182        LogLevel::Off => LevelFilter::Off,
183        LogLevel::Trace => LevelFilter::Trace,
184        LogLevel::Debug => LevelFilter::Debug,
185        LogLevel::Info => LevelFilter::Info,
186        LogLevel::Warning => LevelFilter::Warn,
187        LogLevel::Error => LevelFilter::Error,
188    }
189}
190
191/// Parses a string into a [`LevelFilter`].
192///
193/// # Errors
194///
195/// Returns an error if the provided string is not a valid `LevelFilter`.
196pub fn parse_level_filter_str(s: &str) -> anyhow::Result<LevelFilter> {
197    let mut log_level_str = s.to_string().to_uppercase();
198    if log_level_str == "WARNING" {
199        log_level_str = "WARN".to_string();
200    }
201    LevelFilter::from_str(&log_level_str)
202        .map_err(|_| anyhow::anyhow!("Invalid log level string: '{s}'"))
203}
204
205/// Parses component-specific log levels from a JSON value map.
206///
207/// # Errors
208///
209/// Returns an error if a JSON value in the map is not a string or is not a valid log level.
210pub fn parse_component_levels(
211    original_map: Option<HashMap<String, serde_json::Value>>,
212) -> anyhow::Result<HashMap<Ustr, LevelFilter>> {
213    match original_map {
214        Some(map) => {
215            let mut new_map = HashMap::new();
216            for (key, value) in map {
217                let ustr_key = Ustr::from(&key);
218                let s = value.as_str().ok_or_else(|| {
219                    anyhow::anyhow!(
220                        "Component log level for '{key}' must be a string, was: {value}"
221                    )
222                })?;
223                let lvl = parse_level_filter_str(s)?;
224                new_map.insert(ustr_key, lvl);
225            }
226            Ok(new_map)
227        }
228        None => Ok(HashMap::new()),
229    }
230}
231
232/// Logs that a task has started using `tracing::debug!`.
233pub fn log_task_started(task_name: &str) {
234    tracing::debug!("Started task '{task_name}'");
235}
236
237/// Logs that a task has stopped using `tracing::debug!`.
238pub fn log_task_stopped(task_name: &str) {
239    tracing::debug!("Stopped task '{task_name}'");
240}
241
242/// Logs that a task is being awaited using `tracing::debug!`.
243pub fn log_task_awaiting(task_name: &str) {
244    tracing::debug!("Awaiting task '{task_name}'");
245}
246
247/// Logs that a task was aborted using `tracing::debug!`.
248pub fn log_task_aborted(task_name: &str) {
249    tracing::debug!("Aborted task '{task_name}'");
250}
251
252/// Logs that there was an error in a task `tracing::error!`.
253pub fn log_task_error(task_name: &str, e: &anyhow::Error) {
254    tracing::error!("Error in task '{task_name}': {e}");
255}
256
257#[cfg(test)]
258mod tests {
259    use rstest::rstest;
260
261    use super::*;
262
263    #[rstest]
264    #[case("DEBUG", LevelFilter::Debug)]
265    #[case("debug", LevelFilter::Debug)]
266    #[case("Debug", LevelFilter::Debug)]
267    #[case("DeBuG", LevelFilter::Debug)]
268    #[case("INFO", LevelFilter::Info)]
269    #[case("info", LevelFilter::Info)]
270    #[case("WARNING", LevelFilter::Warn)]
271    #[case("warning", LevelFilter::Warn)]
272    #[case("WARN", LevelFilter::Warn)]
273    #[case("warn", LevelFilter::Warn)]
274    #[case("ERROR", LevelFilter::Error)]
275    #[case("error", LevelFilter::Error)]
276    #[case("OFF", LevelFilter::Off)]
277    #[case("off", LevelFilter::Off)]
278    #[case("TRACE", LevelFilter::Trace)]
279    #[case("trace", LevelFilter::Trace)]
280    fn test_parse_level_filter_str_case_insensitive(
281        #[case] input: &str,
282        #[case] expected: LevelFilter,
283    ) {
284        let result = parse_level_filter_str(input).unwrap();
285        assert_eq!(result, expected);
286    }
287
288    #[rstest]
289    #[case("INVALID")]
290    #[case("DEBG")]
291    #[case("WARNINGG")]
292    #[case("")]
293    #[case("INFO123")]
294    fn test_parse_level_filter_str_invalid_returns_error(#[case] invalid_input: &str) {
295        let result = parse_level_filter_str(invalid_input);
296
297        assert!(result.is_err());
298        assert!(
299            result
300                .unwrap_err()
301                .to_string()
302                .contains("Invalid log level")
303        );
304    }
305
306    #[rstest]
307    fn test_parse_component_levels_valid() {
308        let mut map = HashMap::new();
309        map.insert(
310            "Strategy1".to_string(),
311            serde_json::Value::String("DEBUG".to_string()),
312        );
313        map.insert(
314            "Strategy2".to_string(),
315            serde_json::Value::String("info".to_string()),
316        );
317
318        let result = parse_component_levels(Some(map)).unwrap();
319
320        assert_eq!(result.len(), 2);
321        assert_eq!(result[&Ustr::from("Strategy1")], LevelFilter::Debug);
322        assert_eq!(result[&Ustr::from("Strategy2")], LevelFilter::Info);
323    }
324
325    #[rstest]
326    fn test_parse_component_levels_non_string_value_returns_error() {
327        let mut map = HashMap::new();
328        map.insert(
329            "Strategy1".to_string(),
330            serde_json::Value::Number(123.into()),
331        );
332
333        let result = parse_component_levels(Some(map));
334
335        assert!(result.is_err());
336        assert!(result.unwrap_err().to_string().contains("must be a string"));
337    }
338
339    #[rstest]
340    fn test_parse_component_levels_invalid_level_returns_error() {
341        let mut map = HashMap::new();
342        map.insert(
343            "Strategy1".to_string(),
344            serde_json::Value::String("INVALID_LEVEL".to_string()),
345        );
346
347        let result = parse_component_levels(Some(map));
348
349        assert!(result.is_err());
350        assert!(
351            result
352                .unwrap_err()
353                .to_string()
354                .contains("Invalid log level")
355        );
356    }
357
358    #[rstest]
359    fn test_parse_component_levels_none_returns_empty() {
360        let result = parse_component_levels(None).unwrap();
361        assert_eq!(result.len(), 0);
362    }
363}