nautilus_common/logging/
logger.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    env,
19    fmt::Display,
20    str::FromStr,
21    sync::{atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26    kv::{ToValue, Value},
27    set_boxed_logger, set_max_level, Level, LevelFilter, Log, STATIC_MAX_LEVEL,
28};
29use nautilus_core::{
30    datetime::unix_nanos_to_iso8601,
31    time::{get_atomic_clock_realtime, get_atomic_clock_static},
32    UnixNanos, UUID4,
33};
34use nautilus_model::identifiers::TraderId;
35use serde::{Deserialize, Serialize, Serializer};
36use ustr::Ustr;
37
38use super::{LOGGING_BYPASSED, LOGGING_REALTIME};
39use crate::{
40    enums::{LogColor, LogLevel},
41    logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
42};
43
44const LOGGING: &str = "logging";
45
46#[cfg_attr(
47    feature = "python",
48    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
49)]
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct LoggerConfig {
52    /// Maximum log level to write to stdout.
53    pub stdout_level: LevelFilter,
54    /// Maximum log level to write to file (disabled is `Off`).
55    pub fileout_level: LevelFilter,
56    /// Per-component log levels, allowing finer-grained control.
57    component_level: HashMap<Ustr, LevelFilter>,
58    /// If logger is using ANSI color codes.
59    pub is_colored: bool,
60    /// If the configuration should be printed to stdout at initialization.
61    pub print_config: bool,
62}
63
64impl Default for LoggerConfig {
65    /// Creates a new default [`LoggerConfig`] instance.
66    fn default() -> Self {
67        Self {
68            stdout_level: LevelFilter::Info,
69            fileout_level: LevelFilter::Off,
70            component_level: HashMap::new(),
71            is_colored: false,
72            print_config: false,
73        }
74    }
75}
76
77impl LoggerConfig {
78    /// Creates a new [`LoggerConfig`] instance.
79    #[must_use]
80    pub const fn new(
81        stdout_level: LevelFilter,
82        fileout_level: LevelFilter,
83        component_level: HashMap<Ustr, LevelFilter>,
84        is_colored: bool,
85        print_config: bool,
86    ) -> Self {
87        Self {
88            stdout_level,
89            fileout_level,
90            component_level,
91            is_colored,
92            print_config,
93        }
94    }
95
96    #[must_use]
97    pub fn from_spec(spec: &str) -> Self {
98        let Self {
99            mut stdout_level,
100            mut fileout_level,
101            mut component_level,
102            mut is_colored,
103            mut print_config,
104        } = Self::default();
105        spec.split(';').for_each(|kv| {
106            if kv == "is_colored" {
107                is_colored = true;
108            } else if kv == "print_config" {
109                print_config = true;
110            } else {
111                let mut kv = kv.split('=');
112                if let (Some(k), Some(Ok(lvl))) = (kv.next(), kv.next().map(LevelFilter::from_str))
113                {
114                    if k == "stdout" {
115                        stdout_level = lvl;
116                    } else if k == "fileout" {
117                        fileout_level = lvl;
118                    } else {
119                        component_level.insert(Ustr::from(k), lvl);
120                    }
121                }
122            }
123        });
124
125        Self {
126            stdout_level,
127            fileout_level,
128            component_level,
129            is_colored,
130            print_config,
131        }
132    }
133
134    #[must_use]
135    pub fn from_env() -> Self {
136        match env::var("NAUTILUS_LOG") {
137            Ok(spec) => Self::from_spec(&spec),
138            Err(e) => panic!("Error parsing `LoggerConfig` spec: {e}"),
139        }
140    }
141}
142
143/// A high-performance logger utilizing a MPSC channel under the hood.
144///
145/// A logger is initialized with a [`LoggerConfig`] to set up different logging levels for
146/// stdout, file, and components. The logger spawns a thread that listens for [`LogEvent`]s
147/// sent via an MPSC channel.
148#[derive(Debug)]
149pub struct Logger {
150    /// Configuration for logging levels and behavior.
151    pub config: LoggerConfig,
152    /// Transmitter for sending log events to the 'logging' thread.
153    tx: std::sync::mpsc::Sender<LogEvent>,
154}
155
156/// Represents a type of log event.
157pub enum LogEvent {
158    /// A log line event.
159    Log(LogLine),
160    /// A command to flush all logger buffers.
161    Flush,
162}
163
164/// Represents a log event which includes a message.
165#[derive(Clone, Debug, Serialize, Deserialize)]
166pub struct LogLine {
167    /// The log level for the event.
168    pub level: Level,
169    /// The color for the log message content.
170    pub color: LogColor,
171    /// The Nautilus system component the log event originated from.
172    pub component: Ustr,
173    /// The log message content.
174    pub message: String,
175}
176
177impl Display for LogLine {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        write!(f, "[{}] {}: {}", self.level, self.component, self.message)
180    }
181}
182
183/// A wrapper around a log line that provides formatted and cached representations.
184///
185/// This struct contains a log line and provides various formatted versions
186/// of it, such as plain string, colored string, and JSON. It also caches the
187/// results for repeated calls, optimizing performance when the same message
188/// needs to be logged multiple times in different formats.
189pub struct LogLineWrapper {
190    /// The underlying log line that contains the log data.
191    line: LogLine,
192    /// Cached plain string representation of the log line.
193    cache: Option<String>,
194    /// Cached colored string representation of the log line.
195    colored: Option<String>,
196    /// The timestamp of when the log event occurred.
197    timestamp: String,
198    /// The ID of the trader associated with this log event.
199    trader_id: Ustr,
200}
201
202impl LogLineWrapper {
203    /// Creates a new [`LogLineWrapper`] instance.
204    #[must_use]
205    pub fn new(line: LogLine, trader_id: Ustr, timestamp: UnixNanos) -> Self {
206        Self {
207            line,
208            cache: None,
209            colored: None,
210            timestamp: unix_nanos_to_iso8601(timestamp),
211            trader_id,
212        }
213    }
214
215    /// Returns the plain log message string, caching the result.
216    ///
217    /// This method constructs the log line format and caches it for repeated calls. Useful when the
218    /// same log message needs to be printed multiple times.
219    pub fn get_string(&mut self) -> &str {
220        self.cache.get_or_insert_with(|| {
221            format!(
222                "{} [{}] {}.{}: {}\n",
223                self.timestamp,
224                self.line.level,
225                self.trader_id,
226                &self.line.component,
227                &self.line.message,
228            )
229        })
230    }
231
232    /// Returns the colored log message string, caching the result.
233    ///
234    /// This method constructs the colored log line format and caches the result
235    /// for repeated calls, providing the message with ANSI color codes if the
236    /// logger is configured to use colors.
237    pub fn get_colored(&mut self) -> &str {
238        self.colored.get_or_insert_with(|| {
239            format!(
240                "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
241                self.timestamp,
242                &self.line.color.as_ansi(),
243                self.line.level,
244                self.trader_id,
245                &self.line.component,
246                &self.line.message,
247            )
248        })
249    }
250
251    /// Returns the log message as a JSON string.
252    ///
253    /// This method serializes the log line and its associated metadata
254    /// (timestamp, trader ID, etc.) into a JSON string format. This is useful
255    /// for structured logging or when logs need to be stored in a JSON format.
256    #[must_use]
257    pub fn get_json(&self) -> String {
258        let json_string =
259            serde_json::to_string(&self).expect("Error serializing log event to string");
260        format!("{json_string}\n")
261    }
262}
263
264impl Serialize for LogLineWrapper {
265    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
266    where
267        S: Serializer,
268    {
269        let mut json_obj = IndexMap::new();
270        json_obj.insert("timestamp".to_string(), self.timestamp.clone());
271        json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
272        json_obj.insert("level".to_string(), self.line.level.to_string());
273        json_obj.insert("color".to_string(), self.line.color.to_string());
274        json_obj.insert("component".to_string(), self.line.component.to_string());
275        json_obj.insert("message".to_string(), self.line.message.to_string());
276
277        json_obj.serialize(serializer)
278    }
279}
280
281impl Log for Logger {
282    fn enabled(&self, metadata: &log::Metadata) -> bool {
283        !LOGGING_BYPASSED.load(Ordering::Relaxed)
284            && (metadata.level() == Level::Error
285                || metadata.level() <= self.config.stdout_level
286                || metadata.level() <= self.config.fileout_level)
287    }
288
289    fn log(&self, record: &log::Record) {
290        if self.enabled(record.metadata()) {
291            let key_values = record.key_values();
292            let color = key_values
293                .get("color".into())
294                .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
295                .unwrap_or(LogColor::Normal);
296            let component = key_values.get("component".into()).map_or_else(
297                || Ustr::from(record.metadata().target()),
298                |v| Ustr::from(&v.to_string()),
299            );
300
301            let line = LogLine {
302                level: record.level(),
303                color,
304                component,
305                message: format!("{}", record.args()),
306            };
307            if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
308                eprintln!("Error sending log event (receiver closed): {line}");
309            }
310        }
311    }
312
313    fn flush(&self) {
314        if let Err(e) = self.tx.send(LogEvent::Flush) {
315            eprintln!("Error sending flush log event (receiver closed): {e}");
316        }
317    }
318}
319
320#[allow(clippy::too_many_arguments)]
321impl Logger {
322    #[must_use]
323    pub fn init_with_env(
324        trader_id: TraderId,
325        instance_id: UUID4,
326        file_config: FileWriterConfig,
327    ) -> LogGuard {
328        let config = LoggerConfig::from_env();
329        Self::init_with_config(trader_id, instance_id, config, file_config)
330    }
331
332    /// Initializes the logger with the given configuration.
333    ///
334    /// # Examples
335    ///
336    /// ```rust
337    /// let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error");
338    /// let file_config = FileWriterConfig::default();
339    /// let log_guard = Logger::init_with_config(trader_id, instance_id, config, file_config);
340    /// ```
341    #[must_use]
342    pub fn init_with_config(
343        trader_id: TraderId,
344        instance_id: UUID4,
345        config: LoggerConfig,
346        file_config: FileWriterConfig,
347    ) -> LogGuard {
348        let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
349
350        let logger = Self {
351            tx,
352            config: config.clone(),
353        };
354
355        let print_config = config.print_config;
356        if print_config {
357            println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
358            println!("Logger initialized with {config:?} {file_config:?}");
359        }
360
361        let mut handle: Option<std::thread::JoinHandle<()>> = None;
362        match set_boxed_logger(Box::new(logger)) {
363            Ok(()) => {
364                handle = Some(
365                    std::thread::Builder::new()
366                        .name(LOGGING.to_string())
367                        .spawn(move || {
368                            Self::handle_messages(
369                                trader_id.to_string(),
370                                instance_id.to_string(),
371                                config,
372                                file_config,
373                                rx,
374                            );
375                        })
376                        .expect("Error spawning thread '{LOGGING}'"),
377                );
378
379                let max_level = log::LevelFilter::Trace;
380                set_max_level(max_level);
381                if print_config {
382                    println!("Logger set as `log` implementation with max level {max_level}");
383                }
384            }
385            Err(e) => {
386                eprintln!("Cannot set logger because of error: {e}");
387            }
388        }
389
390        LogGuard::new(handle)
391    }
392
393    fn handle_messages(
394        trader_id: String,
395        instance_id: String,
396        config: LoggerConfig,
397        file_config: FileWriterConfig,
398        rx: std::sync::mpsc::Receiver<LogEvent>,
399    ) {
400        let LoggerConfig {
401            stdout_level,
402            fileout_level,
403            ref component_level,
404            is_colored,
405            print_config: _,
406        } = config;
407
408        let trader_id_cache = Ustr::from(&trader_id);
409
410        // Set up std I/O buffers
411        let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
412        let mut stderr_writer = StderrWriter::new(is_colored);
413
414        // Conditionally create file writer based on fileout_level
415        let mut file_writer_opt = if fileout_level == LevelFilter::Off {
416            None
417        } else {
418            FileWriter::new(trader_id, instance_id, file_config, fileout_level)
419        };
420
421        // Continue to receive and handle log events until channel is hung up
422        while let Ok(event) = rx.recv() {
423            match event {
424                LogEvent::Flush => {
425                    break;
426                }
427                LogEvent::Log(line) => {
428                    let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
429                        get_atomic_clock_realtime().get_time_ns()
430                    } else {
431                        get_atomic_clock_static().get_time_ns()
432                    };
433
434                    let component_level = component_level.get(&line.component);
435
436                    // Check if the component exists in level_filters,
437                    // and if its level is greater than event.level.
438                    if let Some(&filter_level) = component_level {
439                        if line.level > filter_level {
440                            continue;
441                        }
442                    }
443
444                    let mut wrapper = LogLineWrapper::new(line, trader_id_cache, timestamp);
445
446                    if stderr_writer.enabled(&wrapper.line) {
447                        if is_colored {
448                            stderr_writer.write(wrapper.get_colored());
449                        } else {
450                            stderr_writer.write(wrapper.get_string());
451                        }
452                    }
453
454                    if stdout_writer.enabled(&wrapper.line) {
455                        if is_colored {
456                            stdout_writer.write(wrapper.get_colored());
457                        } else {
458                            stdout_writer.write(wrapper.get_string());
459                        }
460                    }
461
462                    if let Some(ref mut writer) = file_writer_opt {
463                        if writer.enabled(&wrapper.line) {
464                            if writer.json_format {
465                                writer.write(&wrapper.get_json());
466                            } else {
467                                writer.write(wrapper.get_string());
468                            }
469                        }
470                    }
471                }
472            }
473        }
474    }
475}
476
477pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
478    let color = Value::from(color as u8);
479
480    match level {
481        LogLevel::Off => {}
482        LogLevel::Trace => {
483            log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
484        }
485        LogLevel::Debug => {
486            log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
487        }
488        LogLevel::Info => {
489            log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
490        }
491        LogLevel::Warning => {
492            log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
493        }
494        LogLevel::Error => {
495            log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
496        }
497    }
498}
499
500#[cfg_attr(
501    feature = "python",
502    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
503)]
504#[derive(Debug)]
505pub struct LogGuard {
506    handle: Option<std::thread::JoinHandle<()>>,
507}
508
509impl LogGuard {
510    /// Creates a new [`LogGuard`] instance.
511    #[must_use]
512    pub const fn new(handle: Option<std::thread::JoinHandle<()>>) -> Self {
513        Self { handle }
514    }
515}
516
517impl Default for LogGuard {
518    /// Creates a new default [`LogGuard`] instance.
519    fn default() -> Self {
520        Self::new(None)
521    }
522}
523
524impl Drop for LogGuard {
525    fn drop(&mut self) {
526        log::logger().flush();
527        if let Some(handle) = self.handle.take() {
528            handle.join().expect("Error joining logging handle");
529        }
530    }
531}
532
533////////////////////////////////////////////////////////////////////////////////
534// Tests
535////////////////////////////////////////////////////////////////////////////////
536#[cfg(test)]
537mod tests {
538    use std::{collections::HashMap, time::Duration};
539
540    use log::LevelFilter;
541    use nautilus_core::UUID4;
542    use nautilus_model::identifiers::TraderId;
543    use rstest::*;
544    use serde_json::Value;
545    use tempfile::tempdir;
546    use ustr::Ustr;
547
548    use super::*;
549    use crate::{
550        enums::LogColor,
551        logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
552        testing::wait_until,
553    };
554
555    #[rstest]
556    fn log_message_serialization() {
557        let log_message = LogLine {
558            level: log::Level::Info,
559            color: LogColor::Normal,
560            component: Ustr::from("Portfolio"),
561            message: "This is a log message".to_string(),
562        };
563
564        let serialized_json = serde_json::to_string(&log_message).unwrap();
565        let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
566
567        assert_eq!(deserialized_value["level"], "INFO");
568        assert_eq!(deserialized_value["component"], "Portfolio");
569        assert_eq!(deserialized_value["message"], "This is a log message");
570    }
571
572    #[rstest]
573    fn log_config_parsing() {
574        let config =
575            LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error");
576        assert_eq!(
577            config,
578            LoggerConfig {
579                stdout_level: LevelFilter::Info,
580                fileout_level: LevelFilter::Debug,
581                component_level: HashMap::from_iter(vec![(
582                    Ustr::from("RiskEngine"),
583                    LevelFilter::Error
584                )]),
585                is_colored: true,
586                print_config: false,
587            }
588        );
589    }
590
591    #[rstest]
592    fn log_config_parsing2() {
593        let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;");
594        assert_eq!(
595            config,
596            LoggerConfig {
597                stdout_level: LevelFilter::Warn,
598                fileout_level: LevelFilter::Error,
599                component_level: HashMap::new(),
600                is_colored: false,
601                print_config: true,
602            }
603        );
604    }
605
606    #[rstest]
607    fn test_logging_to_file() {
608        let config = LoggerConfig {
609            fileout_level: LevelFilter::Debug,
610            ..Default::default()
611        };
612
613        let temp_dir = tempdir().expect("Failed to create temporary directory");
614        let file_config = FileWriterConfig {
615            directory: Some(temp_dir.path().to_str().unwrap().to_string()),
616            ..Default::default()
617        };
618
619        let log_guard = Logger::init_with_config(
620            TraderId::from("TRADER-001"),
621            UUID4::new(),
622            config,
623            file_config,
624        );
625
626        logging_clock_set_static_mode();
627        logging_clock_set_static_time(1_650_000_000_000_000);
628
629        log::info!(
630            component = "RiskEngine";
631            "This is a test."
632        );
633
634        let mut log_contents = String::new();
635
636        wait_until(
637            || {
638                std::fs::read_dir(&temp_dir)
639                    .expect("Failed to read directory")
640                    .filter_map(Result::ok)
641                    .any(|entry| entry.path().is_file())
642            },
643            Duration::from_secs(2),
644        );
645
646        drop(log_guard); // Ensure log buffers are flushed
647
648        wait_until(
649            || {
650                let log_file_path = std::fs::read_dir(&temp_dir)
651                    .expect("Failed to read directory")
652                    .filter_map(Result::ok)
653                    .find(|entry| entry.path().is_file())
654                    .expect("No files found in directory")
655                    .path();
656                dbg!(&log_file_path);
657                log_contents =
658                    std::fs::read_to_string(log_file_path).expect("Error while reading log file");
659                !log_contents.is_empty()
660            },
661            Duration::from_secs(2),
662        );
663
664        assert_eq!(
665            log_contents,
666            "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
667        );
668    }
669
670    #[rstest]
671    fn test_log_component_level_filtering() {
672        let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error");
673
674        let temp_dir = tempdir().expect("Failed to create temporary directory");
675        let file_config = FileWriterConfig {
676            directory: Some(temp_dir.path().to_str().unwrap().to_string()),
677            ..Default::default()
678        };
679
680        let log_guard = Logger::init_with_config(
681            TraderId::from("TRADER-001"),
682            UUID4::new(),
683            config,
684            file_config,
685        );
686
687        logging_clock_set_static_mode();
688        logging_clock_set_static_time(1_650_000_000_000_000);
689
690        log::info!(
691            component = "RiskEngine";
692            "This is a test."
693        );
694
695        drop(log_guard); // Ensure log buffers are flushed
696
697        wait_until(
698            || {
699                if let Some(log_file) = std::fs::read_dir(&temp_dir)
700                    .expect("Failed to read directory")
701                    .filter_map(Result::ok)
702                    .find(|entry| entry.path().is_file())
703                {
704                    let log_file_path = log_file.path();
705                    let log_contents = std::fs::read_to_string(log_file_path)
706                        .expect("Error while reading log file");
707                    !log_contents.contains("RiskEngine")
708                } else {
709                    false
710                }
711            },
712            Duration::from_secs(3),
713        );
714
715        assert!(
716            std::fs::read_dir(&temp_dir)
717                .expect("Failed to read directory")
718                .filter_map(Result::ok)
719                .any(|entry| entry.path().is_file()),
720            "Log file exists"
721        );
722    }
723
724    #[rstest]
725    fn test_logging_to_file_in_json_format() {
726        let config =
727            LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info");
728
729        let temp_dir = tempdir().expect("Failed to create temporary directory");
730        let file_config = FileWriterConfig {
731            directory: Some(temp_dir.path().to_str().unwrap().to_string()),
732            file_format: Some("json".to_string()),
733            ..Default::default()
734        };
735
736        let log_guard = Logger::init_with_config(
737            TraderId::from("TRADER-001"),
738            UUID4::new(),
739            config,
740            file_config,
741        );
742
743        logging_clock_set_static_mode();
744        logging_clock_set_static_time(1_650_000_000_000_000);
745
746        log::info!(
747            component = "RiskEngine";
748            "This is a test."
749        );
750
751        let mut log_contents = String::new();
752
753        drop(log_guard); // Ensure log buffers are flushed
754
755        wait_until(
756            || {
757                if let Some(log_file) = std::fs::read_dir(&temp_dir)
758                    .expect("Failed to read directory")
759                    .filter_map(Result::ok)
760                    .find(|entry| entry.path().is_file())
761                {
762                    let log_file_path = log_file.path();
763                    log_contents = std::fs::read_to_string(log_file_path)
764                        .expect("Error while reading log file");
765                    !log_contents.is_empty()
766                } else {
767                    false
768                }
769            },
770            Duration::from_secs(2),
771        );
772
773        assert_eq!(
774        log_contents,
775        "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
776    );
777    }
778}