nautilus_common/logging/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The logging framework for Nautilus systems.
17//!
18//! This module implements a high-performance logging subsystem that operates in a separate thread
19//! using an MPSC channel for log message delivery. The system uses reference counting to track
20//! active `LogGuard` instances, ensuring the logging thread completes all pending writes before
21//! termination.
22//!
23//! # LogGuard Reference Counting
24//!
25//! The logging system maintains a global count of active `LogGuard` instances using an atomic
26//! counter (`LOGGING_GUARDS_ACTIVE`). When a `LogGuard` is created, the counter is incremented,
27//! and when dropped, it's decremented. When the last `LogGuard` is dropped (counter reaches zero),
28//! the logging thread is properly joined to ensure all buffered log messages are written to their
29//! destinations before the process terminates.
30//!
31//! The system supports a maximum of 255 concurrent `LogGuard` instances. Attempting to create
32//! more will cause a panic.
33
34pub mod config;
35pub mod headers;
36pub mod logger;
37pub mod macros;
38pub mod writer;
39
40use std::{
41    collections::HashMap,
42    env,
43    str::FromStr,
44    sync::atomic::{AtomicBool, AtomicU8, Ordering},
45};
46
47use ahash::AHashMap;
48use log::LevelFilter;
49// Re-exports
50pub use macros::{log_debug, log_error, log_info, log_trace, log_warn};
51use nautilus_core::{UUID4, time::get_atomic_clock_static};
52use nautilus_model::identifiers::TraderId;
53use tracing_subscriber::EnvFilter;
54use ustr::Ustr;
55
56use self::{
57    logger::{LogGuard, Logger, LoggerConfig},
58    writer::FileWriterConfig,
59};
60use crate::enums::LogLevel;
61
62pub const RECV: &str = "<--";
63pub const SEND: &str = "-->";
64pub const CMD: &str = "[CMD]";
65pub const EVT: &str = "[EVT]";
66pub const DOC: &str = "[DOC]";
67pub const RPT: &str = "[RPT]";
68pub const REQ: &str = "[REQ]";
69pub const RES: &str = "[RES]";
70
71static LOGGING_INITIALIZED: AtomicBool = AtomicBool::new(false);
72static LOGGING_BYPASSED: AtomicBool = AtomicBool::new(false);
73static LOGGING_REALTIME: AtomicBool = AtomicBool::new(true);
74static LOGGING_COLORED: AtomicBool = AtomicBool::new(true);
75static LOGGING_GUARDS_ACTIVE: AtomicU8 = AtomicU8::new(0);
76
77/// Returns whether the core logger is enabled.
78pub fn logging_is_initialized() -> bool {
79    LOGGING_INITIALIZED.load(Ordering::Relaxed)
80}
81
82/// Sets the logging subsystem to bypass mode.
83pub fn logging_set_bypass() {
84    LOGGING_BYPASSED.store(true, Ordering::Relaxed);
85}
86
87/// Shuts down the logging subsystem.
88pub fn logging_shutdown() {
89    // Perform a graceful shutdown: prevent new logs, signal Close, drain and join.
90    // Delegates to logger implementation which has access to the internals.
91    crate::logging::logger::shutdown_graceful();
92}
93
94/// Returns whether the core logger is using ANSI colors.
95pub fn logging_is_colored() -> bool {
96    LOGGING_COLORED.load(Ordering::Relaxed)
97}
98
99/// Sets the global logging clock to real-time mode.
100pub fn logging_clock_set_realtime_mode() {
101    LOGGING_REALTIME.store(true, Ordering::Relaxed);
102}
103
104/// Sets the global logging clock to static mode.
105pub fn logging_clock_set_static_mode() {
106    LOGGING_REALTIME.store(false, Ordering::Relaxed);
107}
108
109/// Sets the global logging clock static time with the given UNIX timestamp (nanoseconds).
110pub fn logging_clock_set_static_time(time_ns: u64) {
111    let clock = get_atomic_clock_static();
112    clock.set_time(time_ns.into());
113}
114
115/// Initialize tracing.
116///
117/// Tracing is meant to be used to trace/debug async Rust code. It can be
118/// configured to filter modules and write up to a specific level by passing
119/// a configuration using the `RUST_LOG` environment variable.
120///
121/// # Safety
122///
123/// Should only be called once during an applications run, ideally at the
124/// beginning of the run.
125///
126/// # Errors
127///
128/// Returns an error if tracing subscriber fails to initialize.
129pub fn init_tracing() -> anyhow::Result<()> {
130    // Skip tracing initialization if `RUST_LOG` is not set
131    if let Ok(v) = env::var("RUST_LOG") {
132        let env_filter = EnvFilter::new(v.clone());
133
134        if tracing_subscriber::fmt()
135            .with_env_filter(env_filter)
136            .try_init()
137            .is_ok()
138        {
139            println!("Initialized tracing logs with RUST_LOG={v}");
140        }
141    }
142    Ok(())
143}
144
145/// Initialize logging.
146///
147/// Logging should be used for Python and sync Rust logic which is most of
148/// the components in the [nautilus_trader](https://pypi.org/project/nautilus_trader) package.
149/// Logging can be configured to filter components and write up to a specific level only
150/// by passing a configuration using the `NAUTILUS_LOG` environment variable.
151///
152/// # Safety
153///
154/// Should only be called once during an applications run, ideally at the
155/// beginning of the run.
156///
157/// Logging should be used for Python and sync Rust logic which is most of
158/// the components in the `nautilus_trader` package.
159/// Logging can be configured via the `NAUTILUS_LOG` environment variable.
160///
161/// # Errors
162///
163/// Returns an error if the logging subsystem fails to initialize.
164pub fn init_logging(
165    trader_id: TraderId,
166    instance_id: UUID4,
167    config: LoggerConfig,
168    file_config: FileWriterConfig,
169) -> anyhow::Result<LogGuard> {
170    Logger::init_with_config(trader_id, instance_id, config, file_config)
171}
172
173#[must_use]
174pub const fn map_log_level_to_filter(log_level: LogLevel) -> LevelFilter {
175    match log_level {
176        LogLevel::Off => LevelFilter::Off,
177        LogLevel::Trace => LevelFilter::Trace,
178        LogLevel::Debug => LevelFilter::Debug,
179        LogLevel::Info => LevelFilter::Info,
180        LogLevel::Warning => LevelFilter::Warn,
181        LogLevel::Error => LevelFilter::Error,
182    }
183}
184
185/// Parses a string into a [`LevelFilter`].
186///
187/// # Errors
188///
189/// Returns an error if the provided string is not a valid `LevelFilter`.
190pub fn parse_level_filter_str(s: &str) -> anyhow::Result<LevelFilter> {
191    let mut log_level_str = s.to_string().to_uppercase();
192    if log_level_str == "WARNING" {
193        log_level_str = "WARN".to_string();
194    }
195    LevelFilter::from_str(&log_level_str)
196        .map_err(|_| anyhow::anyhow!("Invalid log level string: '{s}'"))
197}
198
199/// Parses component-specific log levels from a JSON value map.
200///
201/// # Errors
202///
203/// Returns an error if a JSON value in the map is not a string or is not a valid log level.
204pub fn parse_component_levels(
205    original_map: Option<HashMap<String, serde_json::Value>>,
206) -> anyhow::Result<AHashMap<Ustr, LevelFilter>> {
207    match original_map {
208        Some(map) => {
209            let mut new_map = AHashMap::new();
210            for (key, value) in map {
211                let ustr_key = Ustr::from(&key);
212                let s = value.as_str().ok_or_else(|| {
213                    anyhow::anyhow!(
214                        "Component log level for '{key}' must be a string, was: {value}"
215                    )
216                })?;
217                let lvl = parse_level_filter_str(s)?;
218                new_map.insert(ustr_key, lvl);
219            }
220            Ok(new_map)
221        }
222        None => Ok(AHashMap::new()),
223    }
224}
225
226/// Logs that a task has started using `tracing::debug!`.
227pub fn log_task_started(task_name: &str) {
228    tracing::debug!("Started task '{task_name}'");
229}
230
231/// Logs that a task has stopped using `tracing::debug!`.
232pub fn log_task_stopped(task_name: &str) {
233    tracing::debug!("Stopped task '{task_name}'");
234}
235
236/// Logs that a task is being awaited using `tracing::debug!`.
237pub fn log_task_awaiting(task_name: &str) {
238    tracing::debug!("Awaiting task '{task_name}'");
239}
240
241/// Logs that a task was aborted using `tracing::debug!`.
242pub fn log_task_aborted(task_name: &str) {
243    tracing::debug!("Aborted task '{task_name}'");
244}
245
246/// Logs that there was an error in a task `tracing::error!`.
247pub fn log_task_error(task_name: &str, e: &anyhow::Error) {
248    tracing::error!("Error in task '{task_name}': {e}");
249}
250
251#[cfg(test)]
252mod tests {
253    use rstest::rstest;
254
255    use super::*;
256
257    #[rstest]
258    #[case("DEBUG", LevelFilter::Debug)]
259    #[case("debug", LevelFilter::Debug)]
260    #[case("Debug", LevelFilter::Debug)]
261    #[case("DeBuG", LevelFilter::Debug)]
262    #[case("INFO", LevelFilter::Info)]
263    #[case("info", LevelFilter::Info)]
264    #[case("WARNING", LevelFilter::Warn)]
265    #[case("warning", LevelFilter::Warn)]
266    #[case("WARN", LevelFilter::Warn)]
267    #[case("warn", LevelFilter::Warn)]
268    #[case("ERROR", LevelFilter::Error)]
269    #[case("error", LevelFilter::Error)]
270    #[case("OFF", LevelFilter::Off)]
271    #[case("off", LevelFilter::Off)]
272    #[case("TRACE", LevelFilter::Trace)]
273    #[case("trace", LevelFilter::Trace)]
274    fn test_parse_level_filter_str_case_insensitive(
275        #[case] input: &str,
276        #[case] expected: LevelFilter,
277    ) {
278        let result = parse_level_filter_str(input).unwrap();
279        assert_eq!(result, expected);
280    }
281
282    #[rstest]
283    #[case("INVALID")]
284    #[case("DEBG")]
285    #[case("WARNINGG")]
286    #[case("")]
287    #[case("INFO123")]
288    fn test_parse_level_filter_str_invalid_returns_error(#[case] invalid_input: &str) {
289        let result = parse_level_filter_str(invalid_input);
290
291        assert!(result.is_err());
292        assert!(
293            result
294                .unwrap_err()
295                .to_string()
296                .contains("Invalid log level")
297        );
298    }
299
300    #[rstest]
301    fn test_parse_component_levels_valid() {
302        let mut map = HashMap::new();
303        map.insert(
304            "Strategy1".to_string(),
305            serde_json::Value::String("DEBUG".to_string()),
306        );
307        map.insert(
308            "Strategy2".to_string(),
309            serde_json::Value::String("info".to_string()),
310        );
311
312        let result = parse_component_levels(Some(map)).unwrap();
313
314        assert_eq!(result.len(), 2);
315        assert_eq!(result[&Ustr::from("Strategy1")], LevelFilter::Debug);
316        assert_eq!(result[&Ustr::from("Strategy2")], LevelFilter::Info);
317    }
318
319    #[rstest]
320    fn test_parse_component_levels_non_string_value_returns_error() {
321        let mut map = HashMap::new();
322        map.insert(
323            "Strategy1".to_string(),
324            serde_json::Value::Number(123.into()),
325        );
326
327        let result = parse_component_levels(Some(map));
328
329        assert!(result.is_err());
330        assert!(result.unwrap_err().to_string().contains("must be a string"));
331    }
332
333    #[rstest]
334    fn test_parse_component_levels_invalid_level_returns_error() {
335        let mut map = HashMap::new();
336        map.insert(
337            "Strategy1".to_string(),
338            serde_json::Value::String("INVALID_LEVEL".to_string()),
339        );
340
341        let result = parse_component_levels(Some(map));
342
343        assert!(result.is_err());
344        assert!(
345            result
346                .unwrap_err()
347                .to_string()
348                .contains("Invalid log level")
349        );
350    }
351
352    #[rstest]
353    fn test_parse_component_levels_none_returns_empty() {
354        let result = parse_component_levels(None).unwrap();
355        assert_eq!(result.len(), 0);
356    }
357
358    #[rstest]
359    fn test_logging_clock_set_static_mode() {
360        logging_clock_set_static_mode();
361        assert!(!LOGGING_REALTIME.load(Ordering::Relaxed));
362    }
363
364    #[rstest]
365    fn test_logging_clock_set_realtime_mode() {
366        logging_clock_set_realtime_mode();
367        assert!(LOGGING_REALTIME.load(Ordering::Relaxed));
368    }
369
370    #[rstest]
371    fn test_logging_clock_set_static_time() {
372        let test_time: u64 = 1_700_000_000_000_000_000;
373        logging_clock_set_static_time(test_time);
374        let clock = get_atomic_clock_static();
375        assert_eq!(clock.get_time_ns(), test_time);
376    }
377
378    #[rstest]
379    fn test_logging_set_bypass() {
380        logging_set_bypass();
381        assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
382    }
383
384    #[rstest]
385    fn test_map_log_level_to_filter() {
386        assert_eq!(map_log_level_to_filter(LogLevel::Off), LevelFilter::Off);
387        assert_eq!(map_log_level_to_filter(LogLevel::Trace), LevelFilter::Trace);
388        assert_eq!(map_log_level_to_filter(LogLevel::Debug), LevelFilter::Debug);
389        assert_eq!(map_log_level_to_filter(LogLevel::Info), LevelFilter::Info);
390        assert_eq!(
391            map_log_level_to_filter(LogLevel::Warning),
392            LevelFilter::Warn
393        );
394        assert_eq!(map_log_level_to_filter(LogLevel::Error), LevelFilter::Error);
395    }
396}