nautilus_common/logging/
logger.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    env,
19    fmt::Display,
20    str::FromStr,
21    sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26    Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27    kv::{ToValue, Value},
28    set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31    UUID4, UnixNanos,
32    datetime::unix_nanos_to_iso8601,
33    time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
40use crate::{
41    enums::{LogColor, LogLevel},
42    logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46const KV_COLOR: &str = "color";
47const KV_COMPONENT: &str = "component";
48
49/// Global log sender which allows multiple log guards per process.
50static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
51
52/// Global handle to the logging thread - only one thread exists per process.
53static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
54
55#[cfg_attr(
56    feature = "python",
57    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
58)]
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct LoggerConfig {
61    /// Maximum log level to write to stdout.
62    pub stdout_level: LevelFilter,
63    /// Maximum log level to write to file (disabled is `Off`).
64    pub fileout_level: LevelFilter,
65    /// Per-component log levels, allowing finer-grained control.
66    component_level: HashMap<Ustr, LevelFilter>,
67    /// If only components with explicit component-level filters should be logged.
68    pub log_components_only: bool,
69    /// If logger is using ANSI color codes.
70    pub is_colored: bool,
71    /// If the configuration should be printed to stdout at initialization.
72    pub print_config: bool,
73}
74
75impl Default for LoggerConfig {
76    /// Creates a new default [`LoggerConfig`] instance.
77    fn default() -> Self {
78        Self {
79            stdout_level: LevelFilter::Info,
80            fileout_level: LevelFilter::Off,
81            component_level: HashMap::new(),
82            log_components_only: false,
83            is_colored: true,
84            print_config: false,
85        }
86    }
87}
88
89impl LoggerConfig {
90    /// Creates a new [`LoggerConfig`] instance.
91    #[must_use]
92    pub const fn new(
93        stdout_level: LevelFilter,
94        fileout_level: LevelFilter,
95        component_level: HashMap<Ustr, LevelFilter>,
96        log_components_only: bool,
97        is_colored: bool,
98        print_config: bool,
99    ) -> Self {
100        Self {
101            stdout_level,
102            fileout_level,
103            component_level,
104            log_components_only,
105            is_colored,
106            print_config,
107        }
108    }
109
110    /// # Errors
111    ///
112    /// Returns an error if the spec string is invalid.
113    pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
114        let mut config = Self::default();
115        for kv in spec.split(';') {
116            let kv = kv.trim();
117            if kv.is_empty() {
118                continue;
119            }
120            let kv_lower = kv.to_lowercase(); // For case-insensitive comparison
121
122            if kv_lower == "log_components_only" {
123                config.log_components_only = true;
124            } else if kv_lower == "is_colored" {
125                config.is_colored = true;
126            } else if kv_lower == "print_config" {
127                config.print_config = true;
128            } else {
129                let parts: Vec<&str> = kv.split('=').collect();
130                if parts.len() != 2 {
131                    anyhow::bail!("Invalid spec pair: {}", kv);
132                }
133                let k = parts[0].trim(); // Trim key
134                let v = parts[1].trim(); // Trim value
135                let lvl = LevelFilter::from_str(v)
136                    .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
137                let k_lower = k.to_lowercase(); // Case-insensitive key matching
138                match k_lower.as_str() {
139                    "stdout" => config.stdout_level = lvl,
140                    "fileout" => config.fileout_level = lvl,
141                    _ => {
142                        config.component_level.insert(Ustr::from(k), lvl);
143                    }
144                }
145            }
146        }
147        Ok(config)
148    }
149
150    /// Retrieves the logger configuration from the "`NAUTILUS_LOG`" environment variable.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the variable is unset or invalid.
155    pub fn from_env() -> anyhow::Result<Self> {
156        let spec = env::var("NAUTILUS_LOG")?;
157        Self::from_spec(&spec)
158    }
159}
160
161/// A high-performance logger utilizing a MPSC channel under the hood.
162///
163/// A logger is initialized with a [`LoggerConfig`] to set up different logging levels for
164/// stdout, file, and components. The logger spawns a thread that listens for [`LogEvent`]s
165/// sent via an MPSC channel.
166#[derive(Debug)]
167pub struct Logger {
168    /// Configuration for logging levels and behavior.
169    pub config: LoggerConfig,
170    /// Transmitter for sending log events to the 'logging' thread.
171    tx: std::sync::mpsc::Sender<LogEvent>,
172}
173
174/// Represents a type of log event.
175#[derive(Debug)]
176pub enum LogEvent {
177    /// A log line event.
178    Log(LogLine),
179    /// A command to flush all logger buffers.
180    Flush,
181    /// A command to close the logger.
182    Close,
183}
184
185/// Represents a log event which includes a message.
186#[derive(Clone, Debug, Serialize, Deserialize)]
187pub struct LogLine {
188    /// The timestamp for the event.
189    pub timestamp: UnixNanos,
190    /// The log level for the event.
191    pub level: Level,
192    /// The color for the log message content.
193    pub color: LogColor,
194    /// The Nautilus system component the log event originated from.
195    pub component: Ustr,
196    /// The log message content.
197    pub message: String,
198}
199
200impl Display for LogLine {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        write!(f, "[{}] {}: {}", self.level, self.component, self.message)
203    }
204}
205
206/// A wrapper around a log line that provides formatted and cached representations.
207///
208/// This struct contains a log line and provides various formatted versions
209/// of it, such as plain string, colored string, and JSON. It also caches the
210/// results for repeated calls, optimizing performance when the same message
211/// needs to be logged multiple times in different formats.
212#[derive(Clone, Debug)]
213pub struct LogLineWrapper {
214    /// The underlying log line that contains the log data.
215    line: LogLine,
216    /// Cached plain string representation of the log line.
217    cache: Option<String>,
218    /// Cached colored string representation of the log line.
219    colored: Option<String>,
220    /// The ID of the trader associated with this log event.
221    trader_id: Ustr,
222}
223
224impl LogLineWrapper {
225    /// Creates a new [`LogLineWrapper`] instance.
226    #[must_use]
227    pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
228        Self {
229            line,
230            cache: None,
231            colored: None,
232            trader_id,
233        }
234    }
235
236    /// Returns the plain log message string, caching the result.
237    ///
238    /// This method constructs the log line format and caches it for repeated calls. Useful when the
239    /// same log message needs to be printed multiple times.
240    pub fn get_string(&mut self) -> &str {
241        self.cache.get_or_insert_with(|| {
242            format!(
243                "{} [{}] {}.{}: {}\n",
244                unix_nanos_to_iso8601(self.line.timestamp),
245                self.line.level,
246                self.trader_id,
247                &self.line.component,
248                &self.line.message,
249            )
250        })
251    }
252
253    /// Returns the colored log message string, caching the result.
254    ///
255    /// This method constructs the colored log line format and caches the result
256    /// for repeated calls, providing the message with ANSI color codes if the
257    /// logger is configured to use colors.
258    pub fn get_colored(&mut self) -> &str {
259        self.colored.get_or_insert_with(|| {
260            format!(
261                "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
262                unix_nanos_to_iso8601(self.line.timestamp),
263                &self.line.color.as_ansi(),
264                self.line.level,
265                self.trader_id,
266                &self.line.component,
267                &self.line.message,
268            )
269        })
270    }
271
272    /// Returns the log message as a JSON string.
273    ///
274    /// This method serializes the log line and its associated metadata
275    /// (timestamp, trader ID, etc.) into a JSON string format. This is useful
276    /// for structured logging or when logs need to be stored in a JSON format.
277    /// # Panics
278    ///
279    /// Panics if serialization of the log event to JSON fails.
280    #[must_use]
281    pub fn get_json(&self) -> String {
282        let json_string =
283            serde_json::to_string(&self).expect("Error serializing log event to string");
284        format!("{json_string}\n")
285    }
286}
287
288impl Serialize for LogLineWrapper {
289    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290    where
291        S: Serializer,
292    {
293        let mut json_obj = IndexMap::new();
294        let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
295        json_obj.insert("timestamp".to_string(), timestamp);
296        json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
297        json_obj.insert("level".to_string(), self.line.level.to_string());
298        json_obj.insert("color".to_string(), self.line.color.to_string());
299        json_obj.insert("component".to_string(), self.line.component.to_string());
300        json_obj.insert("message".to_string(), self.line.message.to_string());
301
302        json_obj.serialize(serializer)
303    }
304}
305
306impl Log for Logger {
307    fn enabled(&self, metadata: &log::Metadata) -> bool {
308        !LOGGING_BYPASSED.load(Ordering::Relaxed)
309            && (metadata.level() == Level::Error
310                || metadata.level() <= self.config.stdout_level
311                || metadata.level() <= self.config.fileout_level)
312    }
313
314    fn log(&self, record: &log::Record) {
315        if self.enabled(record.metadata()) {
316            let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
317                get_atomic_clock_realtime().get_time_ns()
318            } else {
319                get_atomic_clock_static().get_time_ns()
320            };
321            let level = record.level();
322            let key_values = record.key_values();
323            let color: LogColor = key_values
324                .get(KV_COLOR.into())
325                .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
326                .unwrap_or(level.into());
327            let component = key_values.get(KV_COMPONENT.into()).map_or_else(
328                || Ustr::from(record.metadata().target()),
329                |v| Ustr::from(&v.to_string()),
330            );
331
332            let line = LogLine {
333                timestamp,
334                level,
335                color,
336                component,
337                message: format!("{}", record.args()),
338            };
339            if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
340                eprintln!("Error sending log event (receiver closed): {line}");
341            }
342        }
343    }
344
345    fn flush(&self) {
346        // Don't attempt to flush if we're already bypassed/shutdown
347        if LOGGING_BYPASSED.load(Ordering::Relaxed) {
348            return;
349        }
350
351        if let Err(e) = self.tx.send(LogEvent::Flush) {
352            eprintln!("Error sending flush log event: {e}");
353        }
354    }
355}
356
357#[allow(clippy::too_many_arguments)]
358impl Logger {
359    /// Initializes the logger based on the `NAUTILUS_LOG` environment variable.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if reading the environment variable or parsing the configuration fails.
364    pub fn init_with_env(
365        trader_id: TraderId,
366        instance_id: UUID4,
367        file_config: FileWriterConfig,
368    ) -> anyhow::Result<LogGuard> {
369        let config = LoggerConfig::from_env()?;
370        Self::init_with_config(trader_id, instance_id, config, file_config)
371    }
372
373    /// Initializes the logger with the given configuration.
374    ///
375    /// # Examples
376    ///
377    /// ```rust
378    /// let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error");
379    /// let file_config = FileWriterConfig::default();
380    /// let log_guard = Logger::init_with_config(trader_id, instance_id, config, file_config);
381    /// ```
382    /// Initializes the logger with the given `LoggerConfig` and `FileWriterConfig`.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if the logger fails to register or initialize the background thread.
387    pub fn init_with_config(
388        trader_id: TraderId,
389        instance_id: UUID4,
390        config: LoggerConfig,
391        file_config: FileWriterConfig,
392    ) -> anyhow::Result<LogGuard> {
393        let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
394
395        let logger_tx = tx.clone();
396        let logger = Self {
397            tx: logger_tx,
398            config: config.clone(),
399        };
400
401        set_boxed_logger(Box::new(logger))?;
402
403        // Store the sender globally so additional guards can be created
404        if LOGGER_TX.set(tx.clone()).is_err() {
405            debug_assert!(
406                false,
407                "LOGGER_TX already set - re-initialization not supported"
408            );
409        }
410
411        let print_config = config.print_config;
412        if print_config {
413            println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
414            println!("Logger initialized with {config:?} {file_config:?}");
415        }
416
417        let handle = std::thread::Builder::new()
418            .name(LOGGING.to_string())
419            .spawn(move || {
420                Self::handle_messages(
421                    trader_id.to_string(),
422                    instance_id.to_string(),
423                    config,
424                    file_config,
425                    rx,
426                );
427            })?;
428
429        // Store the handle globally
430        if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
431            debug_assert!(
432                handle_guard.is_none(),
433                "LOGGER_HANDLE already set - re-initialization not supported"
434            );
435            *handle_guard = Some(handle);
436        }
437
438        let max_level = log::LevelFilter::Trace;
439        set_max_level(max_level);
440
441        if print_config {
442            println!("Logger set as `log` implementation with max level {max_level}");
443        }
444
445        LogGuard::new()
446            .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
447    }
448
449    fn handle_messages(
450        trader_id: String,
451        instance_id: String,
452        config: LoggerConfig,
453        file_config: FileWriterConfig,
454        rx: std::sync::mpsc::Receiver<LogEvent>,
455    ) {
456        let LoggerConfig {
457            stdout_level,
458            fileout_level,
459            component_level,
460            log_components_only,
461            is_colored,
462            print_config: _,
463        } = config;
464
465        let trader_id_cache = Ustr::from(&trader_id);
466
467        // Set up std I/O buffers
468        let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
469        let mut stderr_writer = StderrWriter::new(is_colored);
470
471        // Conditionally create file writer based on fileout_level
472        let mut file_writer_opt = if fileout_level == LevelFilter::Off {
473            None
474        } else {
475            FileWriter::new(
476                trader_id.clone(),
477                instance_id.clone(),
478                file_config.clone(),
479                fileout_level,
480            )
481        };
482
483        let process_event = |event: LogEvent,
484                             stdout_writer: &mut StdoutWriter,
485                             stderr_writer: &mut StderrWriter,
486                             file_writer_opt: &mut Option<FileWriter>| {
487            match event {
488                LogEvent::Log(line) => {
489                    let component_filter_level = component_level.get(&line.component);
490
491                    if log_components_only && component_filter_level.is_none() {
492                        return;
493                    }
494
495                    if let Some(&filter_level) = component_filter_level
496                        && line.level > filter_level
497                    {
498                        return;
499                    }
500
501                    let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
502
503                    if stderr_writer.enabled(&wrapper.line) {
504                        if is_colored {
505                            stderr_writer.write(wrapper.get_colored());
506                        } else {
507                            stderr_writer.write(wrapper.get_string());
508                        }
509                    }
510
511                    if stdout_writer.enabled(&wrapper.line) {
512                        if is_colored {
513                            stdout_writer.write(wrapper.get_colored());
514                        } else {
515                            stdout_writer.write(wrapper.get_string());
516                        }
517                    }
518
519                    if let Some(file_writer) = file_writer_opt
520                        && file_writer.enabled(&wrapper.line)
521                    {
522                        if file_writer.json_format {
523                            file_writer.write(&wrapper.get_json());
524                        } else {
525                            file_writer.write(wrapper.get_string());
526                        }
527                    }
528                }
529                LogEvent::Flush => {
530                    stdout_writer.flush();
531                    stderr_writer.flush();
532
533                    if let Some(file_writer) = file_writer_opt {
534                        file_writer.flush();
535                    }
536                }
537                LogEvent::Close => {
538                    // Close handled in the main loop; ignore here.
539                }
540            }
541        };
542
543        // Continue to receive and handle log events until channel is hung up
544        while let Ok(event) = rx.recv() {
545            match event {
546                LogEvent::Log(_) | LogEvent::Flush => process_event(
547                    event,
548                    &mut stdout_writer,
549                    &mut stderr_writer,
550                    &mut file_writer_opt,
551                ),
552                LogEvent::Close => {
553                    // First flush what's been written so far
554                    stdout_writer.flush();
555                    stderr_writer.flush();
556
557                    if let Some(ref mut file_writer) = file_writer_opt {
558                        file_writer.flush();
559                    }
560
561                    // Drain any remaining events that may have raced with shutdown
562                    // This ensures logs enqueued just before/around shutdown aren't lost.
563                    while let Ok(evt) = rx.try_recv() {
564                        match evt {
565                            LogEvent::Close => (), // ignore extra Close events
566                            _ => process_event(
567                                evt,
568                                &mut stdout_writer,
569                                &mut stderr_writer,
570                                &mut file_writer_opt,
571                            ),
572                        }
573                    }
574
575                    // Final flush after draining
576                    stdout_writer.flush();
577                    stderr_writer.flush();
578
579                    if let Some(ref mut file_writer) = file_writer_opt {
580                        file_writer.flush();
581                    }
582
583                    break;
584                }
585            }
586        }
587    }
588}
589
590/// Gracefully shuts down the logging subsystem by preventing new log events,
591/// signaling the logging thread to close, draining pending messages, and joining
592/// the logging thread.
593pub(crate) fn shutdown_graceful() {
594    // Prevent further logging
595    LOGGING_BYPASSED.store(true, Ordering::SeqCst);
596    log::set_max_level(log::LevelFilter::Off);
597
598    // Signal Close if the sender exists
599    if let Some(tx) = LOGGER_TX.get() {
600        let _ = tx.send(LogEvent::Close);
601    }
602
603    if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
604        && let Some(handle) = handle_guard.take()
605        && handle.thread().id() != std::thread::current().id()
606    {
607        let _ = handle.join();
608    }
609
610    LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
611}
612
613pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
614    let color = Value::from(color as u8);
615
616    match level {
617        LogLevel::Off => {}
618        LogLevel::Trace => {
619            log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
620        }
621        LogLevel::Debug => {
622            log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
623        }
624        LogLevel::Info => {
625            log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
626        }
627        LogLevel::Warning => {
628            log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
629        }
630        LogLevel::Error => {
631            log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
632        }
633    }
634}
635
636/// A guard that manages the lifecycle of the logging subsystem.
637///
638/// `LogGuard` ensures the logging thread remains active while instances exist and properly
639/// terminates when all guards are dropped. The system uses reference counting to track active
640/// guards - when the last `LogGuard` is dropped, the logging thread is joined to ensure all
641/// pending log messages are written before the process terminates.
642///
643/// # Reference Counting
644///
645/// The logging system maintains a global atomic counter of active `LogGuard` instances. This
646/// ensures that:
647/// - The logging thread remains active as long as at least one `LogGuard` exists.
648/// - All log messages are properly flushed when intermediate guards are dropped.
649/// - The logging thread is cleanly terminated and joined when the last guard is dropped.
650///
651/// # Limits
652///
653/// The system supports a maximum of 255 concurrent `LogGuard` instances.
654#[cfg_attr(
655    feature = "python",
656    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
657)]
658#[derive(Debug)]
659pub struct LogGuard {
660    tx: std::sync::mpsc::Sender<LogEvent>,
661}
662
663impl LogGuard {
664    /// Creates a new [`LogGuard`] instance from the global logger.
665    ///
666    /// Returns `None` if logging has not been initialized.
667    ///
668    /// # Panics
669    ///
670    /// Panics if the number of active LogGuards would exceed 255.
671    #[must_use]
672    pub fn new() -> Option<Self> {
673        LOGGER_TX.get().map(|tx| {
674            LOGGING_GUARDS_ACTIVE
675                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
676                    if count == u8::MAX {
677                        None // Reject the update if we're at the limit
678                    } else {
679                        Some(count + 1)
680                    }
681                })
682                .expect("Maximum number of active LogGuards (255) exceeded");
683
684            Self { tx: tx.clone() }
685        })
686    }
687}
688
689impl Drop for LogGuard {
690    fn drop(&mut self) {
691        let previous_count = LOGGING_GUARDS_ACTIVE
692            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
693                if count == 0 {
694                    panic!("LogGuard reference count underflow");
695                }
696                Some(count - 1)
697            })
698            .expect("Failed to decrement LogGuard count");
699
700        // Check if this was the last LogGuard - re-check after decrement to avoid race
701        if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
702            // This is truly the last LogGuard, so we should close the logger and join the thread
703            // to ensure all log messages are written before the process terminates.
704            // Prevent any new log events from being accepted while shutting down.
705            LOGGING_BYPASSED.store(true, Ordering::SeqCst);
706
707            // Disable all log levels to reduce overhead on late calls
708            log::set_max_level(log::LevelFilter::Off);
709
710            // Ensure Close is delivered before joining (critical for shutdown)
711            let _ = self.tx.send(LogEvent::Close);
712
713            // Join the logging thread to ensure all pending logs are written
714            if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
715                && let Some(handle) = handle_guard.take()
716            {
717                // Avoid self-join deadlock
718                if handle.thread().id() != std::thread::current().id() {
719                    let _ = handle.join();
720                }
721            }
722
723            // Reset LOGGING_INITIALIZED since the logging thread has terminated
724            LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
725        } else {
726            // Other LogGuards are still active, just flush our logs
727            let _ = self.tx.send(LogEvent::Flush);
728        }
729    }
730}
731
732////////////////////////////////////////////////////////////////////////////////
733// Tests
734////////////////////////////////////////////////////////////////////////////////
735#[cfg(test)]
736mod tests {
737    use std::{collections::HashMap, thread::sleep, time::Duration};
738
739    use log::LevelFilter;
740    use nautilus_core::UUID4;
741    use nautilus_model::identifiers::TraderId;
742    use rstest::*;
743    use serde_json::Value;
744    use tempfile::tempdir;
745    use ustr::Ustr;
746
747    use super::*;
748    use crate::{
749        enums::LogColor,
750        logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
751        testing::wait_until,
752    };
753
754    #[rstest]
755    fn log_message_serialization() {
756        let log_message = LogLine {
757            timestamp: UnixNanos::default(),
758            level: log::Level::Info,
759            color: LogColor::Normal,
760            component: Ustr::from("Portfolio"),
761            message: "This is a log message".to_string(),
762        };
763
764        let serialized_json = serde_json::to_string(&log_message).unwrap();
765        let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
766
767        assert_eq!(deserialized_value["level"], "INFO");
768        assert_eq!(deserialized_value["component"], "Portfolio");
769        assert_eq!(deserialized_value["message"], "This is a log message");
770    }
771
772    #[rstest]
773    fn log_config_parsing() {
774        let config =
775            LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
776                .unwrap();
777        assert_eq!(
778            config,
779            LoggerConfig {
780                stdout_level: LevelFilter::Info,
781                fileout_level: LevelFilter::Debug,
782                component_level: HashMap::from_iter(vec![(
783                    Ustr::from("RiskEngine"),
784                    LevelFilter::Error
785                )]),
786                log_components_only: false,
787                is_colored: true,
788                print_config: false,
789            }
790        );
791    }
792
793    #[rstest]
794    fn log_config_parsing2() {
795        let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
796        assert_eq!(
797            config,
798            LoggerConfig {
799                stdout_level: LevelFilter::Warn,
800                fileout_level: LevelFilter::Error,
801                component_level: HashMap::new(),
802                log_components_only: false,
803                is_colored: true,
804                print_config: true,
805            }
806        );
807    }
808
809    #[rstest]
810    fn log_config_parsing_with_log_components_only() {
811        let config =
812            LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
813        assert_eq!(
814            config,
815            LoggerConfig {
816                stdout_level: LevelFilter::Info,
817                fileout_level: LevelFilter::Off,
818                component_level: HashMap::from_iter(vec![(
819                    Ustr::from("RiskEngine"),
820                    LevelFilter::Debug
821                )]),
822                log_components_only: true,
823                is_colored: true,
824                print_config: false,
825            }
826        );
827    }
828
829    // These tests need to run serially because they use global logging state
830    mod serial_tests {
831        use super::*;
832
833        #[rstest]
834        fn test_logging_to_file() {
835            let config = LoggerConfig {
836                fileout_level: LevelFilter::Debug,
837                ..Default::default()
838            };
839
840            let temp_dir = tempdir().expect("Failed to create temporary directory");
841            let file_config = FileWriterConfig {
842                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
843                ..Default::default()
844            };
845
846            let log_guard = Logger::init_with_config(
847                TraderId::from("TRADER-001"),
848                UUID4::new(),
849                config,
850                file_config,
851            );
852
853            logging_clock_set_static_mode();
854            logging_clock_set_static_time(1_650_000_000_000_000);
855
856            log::info!(
857                component = "RiskEngine";
858                "This is a test."
859            );
860
861            let mut log_contents = String::new();
862
863            wait_until(
864                || {
865                    std::fs::read_dir(&temp_dir)
866                        .expect("Failed to read directory")
867                        .filter_map(Result::ok)
868                        .any(|entry| entry.path().is_file())
869                },
870                Duration::from_secs(3),
871            );
872
873            drop(log_guard); // Ensure log buffers are flushed
874
875            wait_until(
876                || {
877                    let log_file_path = std::fs::read_dir(&temp_dir)
878                        .expect("Failed to read directory")
879                        .filter_map(Result::ok)
880                        .find(|entry| entry.path().is_file())
881                        .expect("No files found in directory")
882                        .path();
883                    dbg!(&log_file_path);
884                    log_contents = std::fs::read_to_string(log_file_path)
885                        .expect("Error while reading log file");
886                    !log_contents.is_empty()
887                },
888                Duration::from_secs(3),
889            );
890
891            assert_eq!(
892                log_contents,
893                "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
894            );
895        }
896
897        #[rstest]
898        fn test_shutdown_drains_backlog_tail() {
899            // Configure file logging at Info level
900            let config = LoggerConfig {
901                stdout_level: LevelFilter::Off,
902                fileout_level: LevelFilter::Info,
903                ..Default::default()
904            };
905
906            let temp_dir = tempdir().expect("Failed to create temporary directory");
907            let file_config = FileWriterConfig {
908                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
909                ..Default::default()
910            };
911
912            let log_guard = Logger::init_with_config(
913                TraderId::from("TRADER-TAIL"),
914                UUID4::new(),
915                config,
916                file_config,
917            )
918            .expect("Failed to initialize logger");
919
920            // Use static time for reproducibility
921            logging_clock_set_static_mode();
922            logging_clock_set_static_time(1_700_000_000_000_000);
923
924            // Enqueue a known number of messages synchronously
925            const N: usize = 1000;
926            for i in 0..N {
927                log::info!(component = "TailDrain"; "BacklogTest {i}");
928            }
929
930            // Drop guard to trigger shutdown (bypass + close + drain)
931            drop(log_guard);
932
933            // Wait until the file exists and contains at least N lines with our marker
934            let mut count = 0usize;
935            wait_until(
936                || {
937                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
938                        .expect("Failed to read directory")
939                        .filter_map(Result::ok)
940                        .find(|entry| entry.path().is_file())
941                    {
942                        let log_file_path = log_file.path();
943                        if let Ok(contents) = std::fs::read_to_string(log_file_path) {
944                            count = contents
945                                .lines()
946                                .filter(|l| l.contains("BacklogTest "))
947                                .count();
948                            count >= N
949                        } else {
950                            false
951                        }
952                    } else {
953                        false
954                    }
955                },
956                Duration::from_secs(5),
957            );
958
959            assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
960        }
961
962        #[rstest]
963        fn test_log_component_level_filtering() {
964            let config =
965                LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
966
967            let temp_dir = tempdir().expect("Failed to create temporary directory");
968            let file_config = FileWriterConfig {
969                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
970                ..Default::default()
971            };
972
973            let log_guard = Logger::init_with_config(
974                TraderId::from("TRADER-001"),
975                UUID4::new(),
976                config,
977                file_config,
978            );
979
980            logging_clock_set_static_mode();
981            logging_clock_set_static_time(1_650_000_000_000_000);
982
983            log::info!(
984                component = "RiskEngine";
985                "This is a test."
986            );
987
988            drop(log_guard); // Ensure log buffers are flushed
989
990            wait_until(
991                || {
992                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
993                        .expect("Failed to read directory")
994                        .filter_map(Result::ok)
995                        .find(|entry| entry.path().is_file())
996                    {
997                        let log_file_path = log_file.path();
998                        let log_contents = std::fs::read_to_string(log_file_path)
999                            .expect("Error while reading log file");
1000                        !log_contents.contains("RiskEngine")
1001                    } else {
1002                        false
1003                    }
1004                },
1005                Duration::from_secs(3),
1006            );
1007
1008            assert!(
1009                std::fs::read_dir(&temp_dir)
1010                    .expect("Failed to read directory")
1011                    .filter_map(Result::ok)
1012                    .any(|entry| entry.path().is_file()),
1013                "Log file exists"
1014            );
1015        }
1016
1017        #[rstest]
1018        fn test_logging_to_file_in_json_format() {
1019            let config =
1020                LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1021                    .unwrap();
1022
1023            let temp_dir = tempdir().expect("Failed to create temporary directory");
1024            let file_config = FileWriterConfig {
1025                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1026                file_format: Some("json".to_string()),
1027                ..Default::default()
1028            };
1029
1030            let log_guard = Logger::init_with_config(
1031                TraderId::from("TRADER-001"),
1032                UUID4::new(),
1033                config,
1034                file_config,
1035            );
1036
1037            logging_clock_set_static_mode();
1038            logging_clock_set_static_time(1_650_000_000_000_000);
1039
1040            log::info!(
1041                component = "RiskEngine";
1042                "This is a test."
1043            );
1044
1045            let mut log_contents = String::new();
1046
1047            drop(log_guard); // Ensure log buffers are flushed
1048
1049            wait_until(
1050                || {
1051                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
1052                        .expect("Failed to read directory")
1053                        .filter_map(Result::ok)
1054                        .find(|entry| entry.path().is_file())
1055                    {
1056                        let log_file_path = log_file.path();
1057                        log_contents = std::fs::read_to_string(log_file_path)
1058                            .expect("Error while reading log file");
1059                        !log_contents.is_empty()
1060                    } else {
1061                        false
1062                    }
1063                },
1064                Duration::from_secs(3),
1065            );
1066
1067            assert_eq!(
1068                log_contents,
1069                "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1070            );
1071        }
1072
1073        #[ignore = "Flaky test: Passing locally on some systems, failing in CI"]
1074        #[rstest]
1075        fn test_file_rotation_and_backup_limits() {
1076            // Create a temporary directory for log files
1077            let temp_dir = tempdir().expect("Failed to create temporary directory");
1078            let dir_path = temp_dir.path().to_str().unwrap().to_string();
1079
1080            // Configure a small max file size to trigger rotation quickly
1081            let max_backups = 3;
1082            let max_file_size = 100;
1083            let file_config = FileWriterConfig {
1084                directory: Some(dir_path.clone()),
1085                file_name: None,
1086                file_format: Some("log".to_string()),
1087                file_rotate: Some((max_file_size, max_backups).into()), // 100 bytes max size, 3 max backups
1088            };
1089
1090            // Create the file writer
1091            let config = LoggerConfig::from_spec("fileout=Info;Test=Info").unwrap();
1092            let log_guard = Logger::init_with_config(
1093                TraderId::from("TRADER-001"),
1094                UUID4::new(),
1095                config,
1096                file_config,
1097            );
1098
1099            log::info!(
1100                component = "Test";
1101                "Test log message with enough content to exceed our small max file size limit"
1102            );
1103
1104            sleep(Duration::from_millis(100));
1105
1106            // Count the number of log files in the directory
1107            let files: Vec<_> = std::fs::read_dir(&dir_path)
1108                .expect("Failed to read directory")
1109                .filter_map(Result::ok)
1110                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1111                .collect();
1112
1113            // We should have multiple files due to rotation
1114            assert_eq!(files.len(), 1);
1115
1116            log::info!(
1117                component = "Test";
1118                "Test log message with enough content to exceed our small max file size limit"
1119            );
1120
1121            sleep(Duration::from_millis(100));
1122
1123            // Count the number of log files in the directory
1124            let files: Vec<_> = std::fs::read_dir(&dir_path)
1125                .expect("Failed to read directory")
1126                .filter_map(Result::ok)
1127                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1128                .collect();
1129
1130            // We should have multiple files due to rotation
1131            assert_eq!(files.len(), 2);
1132
1133            for _ in 0..5 {
1134                // Write enough data to trigger a few rotations
1135                log::info!(
1136                component = "Test";
1137                "Test log message with enough content to exceed our small max file size limit"
1138                );
1139
1140                sleep(Duration::from_millis(100));
1141            }
1142
1143            // Count the number of log files in the directory
1144            let files: Vec<_> = std::fs::read_dir(&dir_path)
1145                .expect("Failed to read directory")
1146                .filter_map(Result::ok)
1147                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1148                .collect();
1149
1150            // We should have at most max_backups + 1 files (current file + backups)
1151            assert!(
1152                files.len() == max_backups as usize + 1,
1153                "Expected at most {} log files, found {}",
1154                max_backups,
1155                files.len()
1156            );
1157
1158            // Clean up
1159            drop(log_guard);
1160            drop(temp_dir);
1161        }
1162    }
1163}