nautilus_common/logging/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The logging framework for Nautilus systems.
17//!
18//! This module implements a high-performance logging subsystem that operates in a separate thread
19//! using an MPSC channel for log message delivery. The system uses reference counting to track
20//! active `LogGuard` instances, ensuring the logging thread completes all pending writes before
21//! termination.
22//!
23//! # LogGuard Reference Counting
24//!
25//! The logging system maintains a global count of active `LogGuard` instances using an atomic
26//! counter (`LOGGING_GUARDS_ACTIVE`). When a `LogGuard` is created, the counter is incremented,
27//! and when dropped, it's decremented. When the last `LogGuard` is dropped (counter reaches zero),
28//! the logging thread is properly joined to ensure all buffered log messages are written to their
29//! destinations before the process terminates.
30//!
31//! The system supports a maximum of 255 concurrent `LogGuard` instances. Attempting to create
32//! more will cause a panic.
33
34pub mod config;
35pub mod headers;
36pub mod logger;
37pub mod macros;
38pub mod writer;
39
40use std::{
41    collections::HashMap,
42    env,
43    str::FromStr,
44    sync::{
45        OnceLock,
46        atomic::{AtomicBool, AtomicU8, Ordering},
47    },
48};
49
50use ahash::AHashMap;
51use log::LevelFilter;
52// Re-exports
53pub use macros::{log_debug, log_error, log_info, log_trace, log_warn};
54use nautilus_core::{UUID4, time::get_atomic_clock_static};
55use nautilus_model::identifiers::TraderId;
56use ustr::Ustr;
57
58use self::{
59    logger::{LogGuard, Logger, LoggerConfig},
60    writer::FileWriterConfig,
61};
62use crate::enums::LogLevel;
63
64pub const RECV: &str = "<--";
65pub const SEND: &str = "-->";
66pub const CMD: &str = "[CMD]";
67pub const EVT: &str = "[EVT]";
68pub const DOC: &str = "[DOC]";
69pub const RPT: &str = "[RPT]";
70pub const REQ: &str = "[REQ]";
71pub const RES: &str = "[RES]";
72
73static LOGGING_INITIALIZED: AtomicBool = AtomicBool::new(false);
74static LOGGING_BYPASSED: AtomicBool = AtomicBool::new(false);
75static LOGGING_REALTIME: AtomicBool = AtomicBool::new(true);
76static LOGGING_COLORED: AtomicBool = AtomicBool::new(true);
77static LOGGING_GUARDS_ACTIVE: AtomicU8 = AtomicU8::new(0);
78static LAZY_GUARD: OnceLock<Option<LogGuard>> = OnceLock::new();
79
80/// Returns whether the core logger is enabled.
81pub fn logging_is_initialized() -> bool {
82    LOGGING_INITIALIZED.load(Ordering::Relaxed)
83}
84
85/// Ensures logging is initialized on first use.
86///
87/// If `NAUTILUS_LOG` is set, initializes the logger with the specified config.
88/// Otherwise, initializes with INFO level to stdout. This enables lazy
89/// initialization for Rust-only binaries that don't go through the Python
90/// kernel initialization.
91///
92/// Returns `true` if logging is available (either already initialized or
93/// successfully lazy-initialized), `false` otherwise.
94pub fn ensure_logging_initialized() -> bool {
95    if LOGGING_INITIALIZED.load(Ordering::SeqCst) {
96        return true;
97    }
98
99    LAZY_GUARD.get_or_init(|| {
100        let config = env::var("NAUTILUS_LOG")
101            .ok()
102            .and_then(|spec| LoggerConfig::from_spec(&spec).ok())
103            .unwrap_or_default();
104
105        Logger::init_with_config(
106            TraderId::default(),
107            UUID4::default(),
108            config,
109            FileWriterConfig::default(),
110        )
111        .ok()
112    });
113
114    LOGGING_INITIALIZED.load(Ordering::SeqCst)
115}
116
117/// Sets the logging subsystem to bypass mode.
118pub fn logging_set_bypass() {
119    LOGGING_BYPASSED.store(true, Ordering::Relaxed);
120}
121
122/// Shuts down the logging subsystem.
123pub fn logging_shutdown() {
124    // Perform a graceful shutdown: prevent new logs, signal Close, drain and join.
125    // Delegates to logger implementation which has access to the internals.
126    crate::logging::logger::shutdown_graceful();
127}
128
129/// Returns whether the core logger is using ANSI colors.
130pub fn logging_is_colored() -> bool {
131    LOGGING_COLORED.load(Ordering::Relaxed)
132}
133
134/// Sets the global logging clock to real-time mode.
135pub fn logging_clock_set_realtime_mode() {
136    LOGGING_REALTIME.store(true, Ordering::Relaxed);
137}
138
139/// Sets the global logging clock to static mode.
140pub fn logging_clock_set_static_mode() {
141    LOGGING_REALTIME.store(false, Ordering::Relaxed);
142}
143
144/// Sets the global logging clock static time with the given UNIX timestamp (nanoseconds).
145pub fn logging_clock_set_static_time(time_ns: u64) {
146    let clock = get_atomic_clock_static();
147    clock.set_time(time_ns.into());
148}
149
150/// Initialize logging.
151///
152/// Logging should be used for Python and sync Rust logic which is most of
153/// the components in the [nautilus_trader](https://pypi.org/project/nautilus_trader) package.
154/// Logging can be configured to filter components and write up to a specific level only
155/// by passing a configuration using the `NAUTILUS_LOG` environment variable.
156///
157/// # Safety
158///
159/// Should only be called once during an applications run, ideally at the
160/// beginning of the run.
161///
162/// Logging should be used for Python and sync Rust logic which is most of
163/// the components in the `nautilus_trader` package.
164/// Logging can be configured via the `NAUTILUS_LOG` environment variable.
165///
166/// # Errors
167///
168/// Returns an error if the logging subsystem fails to initialize.
169pub fn init_logging(
170    trader_id: TraderId,
171    instance_id: UUID4,
172    config: LoggerConfig,
173    file_config: FileWriterConfig,
174) -> anyhow::Result<LogGuard> {
175    Logger::init_with_config(trader_id, instance_id, config, file_config)
176}
177
178#[must_use]
179pub const fn map_log_level_to_filter(log_level: LogLevel) -> LevelFilter {
180    match log_level {
181        LogLevel::Off => LevelFilter::Off,
182        LogLevel::Trace => LevelFilter::Trace,
183        LogLevel::Debug => LevelFilter::Debug,
184        LogLevel::Info => LevelFilter::Info,
185        LogLevel::Warning => LevelFilter::Warn,
186        LogLevel::Error => LevelFilter::Error,
187    }
188}
189
190/// Parses a string into a [`LevelFilter`].
191///
192/// # Errors
193///
194/// Returns an error if the provided string is not a valid `LevelFilter`.
195pub fn parse_level_filter_str(s: &str) -> anyhow::Result<LevelFilter> {
196    let mut log_level_str = s.to_string().to_uppercase();
197    if log_level_str == "WARNING" {
198        log_level_str = "WARN".to_string();
199    }
200    LevelFilter::from_str(&log_level_str)
201        .map_err(|_| anyhow::anyhow!("Invalid log level string: '{s}'"))
202}
203
204/// Parses component-specific log levels from a JSON value map.
205///
206/// # Errors
207///
208/// Returns an error if a JSON value in the map is not a string or is not a valid log level.
209pub fn parse_component_levels(
210    original_map: Option<HashMap<String, serde_json::Value>>,
211) -> anyhow::Result<AHashMap<Ustr, LevelFilter>> {
212    match original_map {
213        Some(map) => {
214            let mut new_map = AHashMap::new();
215            for (key, value) in map {
216                let ustr_key = Ustr::from(&key);
217                let s = value.as_str().ok_or_else(|| {
218                    anyhow::anyhow!(
219                        "Component log level for '{key}' must be a string, was: {value}"
220                    )
221                })?;
222                let lvl = parse_level_filter_str(s)?;
223                new_map.insert(ustr_key, lvl);
224            }
225            Ok(new_map)
226        }
227        None => Ok(AHashMap::new()),
228    }
229}
230
231/// Logs that a task has started.
232pub fn log_task_started(task_name: &str) {
233    log::debug!("Started task '{task_name}'");
234}
235
236/// Logs that a task has stopped.
237pub fn log_task_stopped(task_name: &str) {
238    log::debug!("Stopped task '{task_name}'");
239}
240
241/// Logs that a task is being awaited.
242pub fn log_task_awaiting(task_name: &str) {
243    log::debug!("Awaiting task '{task_name}'");
244}
245
246/// Logs that a task was aborted.
247pub fn log_task_aborted(task_name: &str) {
248    log::debug!("Aborted task '{task_name}'");
249}
250
251/// Logs that there was an error in a task.
252pub fn log_task_error(task_name: &str, e: &anyhow::Error) {
253    log::error!("Error in task '{task_name}': {e}");
254}
255
256#[cfg(test)]
257mod tests {
258    use rstest::rstest;
259
260    use super::*;
261
262    #[rstest]
263    #[case("DEBUG", LevelFilter::Debug)]
264    #[case("debug", LevelFilter::Debug)]
265    #[case("Debug", LevelFilter::Debug)]
266    #[case("DeBuG", LevelFilter::Debug)]
267    #[case("INFO", LevelFilter::Info)]
268    #[case("info", LevelFilter::Info)]
269    #[case("WARNING", LevelFilter::Warn)]
270    #[case("warning", LevelFilter::Warn)]
271    #[case("WARN", LevelFilter::Warn)]
272    #[case("warn", LevelFilter::Warn)]
273    #[case("ERROR", LevelFilter::Error)]
274    #[case("error", LevelFilter::Error)]
275    #[case("OFF", LevelFilter::Off)]
276    #[case("off", LevelFilter::Off)]
277    #[case("TRACE", LevelFilter::Trace)]
278    #[case("trace", LevelFilter::Trace)]
279    fn test_parse_level_filter_str_case_insensitive(
280        #[case] input: &str,
281        #[case] expected: LevelFilter,
282    ) {
283        let result = parse_level_filter_str(input).unwrap();
284        assert_eq!(result, expected);
285    }
286
287    #[rstest]
288    #[case("INVALID")]
289    #[case("DEBG")]
290    #[case("WARNINGG")]
291    #[case("")]
292    #[case("INFO123")]
293    fn test_parse_level_filter_str_invalid_returns_error(#[case] invalid_input: &str) {
294        let result = parse_level_filter_str(invalid_input);
295
296        assert!(result.is_err());
297        assert!(
298            result
299                .unwrap_err()
300                .to_string()
301                .contains("Invalid log level")
302        );
303    }
304
305    #[rstest]
306    fn test_parse_component_levels_valid() {
307        let mut map = HashMap::new();
308        map.insert(
309            "Strategy1".to_string(),
310            serde_json::Value::String("DEBUG".to_string()),
311        );
312        map.insert(
313            "Strategy2".to_string(),
314            serde_json::Value::String("info".to_string()),
315        );
316
317        let result = parse_component_levels(Some(map)).unwrap();
318
319        assert_eq!(result.len(), 2);
320        assert_eq!(result[&Ustr::from("Strategy1")], LevelFilter::Debug);
321        assert_eq!(result[&Ustr::from("Strategy2")], LevelFilter::Info);
322    }
323
324    #[rstest]
325    fn test_parse_component_levels_non_string_value_returns_error() {
326        let mut map = HashMap::new();
327        map.insert(
328            "Strategy1".to_string(),
329            serde_json::Value::Number(123.into()),
330        );
331
332        let result = parse_component_levels(Some(map));
333
334        assert!(result.is_err());
335        assert!(result.unwrap_err().to_string().contains("must be a string"));
336    }
337
338    #[rstest]
339    fn test_parse_component_levels_invalid_level_returns_error() {
340        let mut map = HashMap::new();
341        map.insert(
342            "Strategy1".to_string(),
343            serde_json::Value::String("INVALID_LEVEL".to_string()),
344        );
345
346        let result = parse_component_levels(Some(map));
347
348        assert!(result.is_err());
349        assert!(
350            result
351                .unwrap_err()
352                .to_string()
353                .contains("Invalid log level")
354        );
355    }
356
357    #[rstest]
358    fn test_parse_component_levels_none_returns_empty() {
359        let result = parse_component_levels(None).unwrap();
360        assert_eq!(result.len(), 0);
361    }
362
363    #[rstest]
364    fn test_logging_clock_set_static_mode() {
365        logging_clock_set_static_mode();
366        assert!(!LOGGING_REALTIME.load(Ordering::Relaxed));
367    }
368
369    #[rstest]
370    fn test_logging_clock_set_realtime_mode() {
371        logging_clock_set_realtime_mode();
372        assert!(LOGGING_REALTIME.load(Ordering::Relaxed));
373    }
374
375    #[rstest]
376    fn test_logging_clock_set_static_time() {
377        let test_time: u64 = 1_700_000_000_000_000_000;
378        logging_clock_set_static_time(test_time);
379        let clock = get_atomic_clock_static();
380        assert_eq!(clock.get_time_ns(), test_time);
381    }
382
383    #[rstest]
384    fn test_logging_set_bypass() {
385        logging_set_bypass();
386        assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
387    }
388
389    #[rstest]
390    fn test_map_log_level_to_filter() {
391        assert_eq!(map_log_level_to_filter(LogLevel::Off), LevelFilter::Off);
392        assert_eq!(map_log_level_to_filter(LogLevel::Trace), LevelFilter::Trace);
393        assert_eq!(map_log_level_to_filter(LogLevel::Debug), LevelFilter::Debug);
394        assert_eq!(map_log_level_to_filter(LogLevel::Info), LevelFilter::Info);
395        assert_eq!(
396            map_log_level_to_filter(LogLevel::Warning),
397            LevelFilter::Warn
398        );
399        assert_eq!(map_log_level_to_filter(LogLevel::Error), LevelFilter::Error);
400    }
401
402    #[rstest]
403    fn test_ensure_logging_initialized_returns_consistent_value() {
404        // This test verifies ensure_logging_initialized() can be called safely.
405        // Due to Once semantics, we can only test one code path per process.
406        //
407        // With nextest (process isolation per test):
408        // - If NAUTILUS_LOG is unset, this returns false.
409        // - If NAUTILUS_LOG is set externally, it may return true.
410        //
411        // The key invariant: multiple calls return the same value.
412        let first_call = ensure_logging_initialized();
413        let second_call = ensure_logging_initialized();
414
415        assert_eq!(
416            first_call, second_call,
417            "ensure_logging_initialized must be idempotent"
418        );
419        assert_eq!(
420            first_call,
421            logging_is_initialized(),
422            "ensure_logging_initialized return value must match logging_is_initialized()"
423        );
424    }
425
426    #[rstest]
427    fn test_ensure_logging_initialized_fast_path() {
428        // If logging is already initialized, the fast path returns true immediately.
429        // This test documents the expected behavior.
430        if logging_is_initialized() {
431            assert!(
432                ensure_logging_initialized(),
433                "Fast path should return true when already initialized"
434            );
435        }
436        // If not initialized, we can't test the initialization path here
437        // without side effects that affect other tests.
438    }
439}