nautilus_common/logging/
logger.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    env,
19    fmt::Display,
20    str::FromStr,
21    sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26    Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27    kv::{ToValue, Value},
28    set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31    UUID4, UnixNanos,
32    datetime::unix_nanos_to_iso8601,
33    time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
40use crate::{
41    enums::{LogColor, LogLevel},
42    logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46const KV_COLOR: &str = "color";
47const KV_COMPONENT: &str = "component";
48
49/// Global log sender which allows multiple log guards per process.
50static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
51
52/// Global handle to the logging thread - only one thread exists per process.
53static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
54
55#[cfg_attr(
56    feature = "python",
57    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
58)]
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct LoggerConfig {
61    /// Maximum log level to write to stdout.
62    pub stdout_level: LevelFilter,
63    /// Maximum log level to write to file (disabled is `Off`).
64    pub fileout_level: LevelFilter,
65    /// Per-component log levels, allowing finer-grained control.
66    component_level: HashMap<Ustr, LevelFilter>,
67    /// If only components with explicit component-level filters should be logged.
68    pub log_components_only: bool,
69    /// If logger is using ANSI color codes.
70    pub is_colored: bool,
71    /// If the configuration should be printed to stdout at initialization.
72    pub print_config: bool,
73}
74
75impl Default for LoggerConfig {
76    /// Creates a new default [`LoggerConfig`] instance.
77    fn default() -> Self {
78        Self {
79            stdout_level: LevelFilter::Info,
80            fileout_level: LevelFilter::Off,
81            component_level: HashMap::new(),
82            log_components_only: false,
83            is_colored: true,
84            print_config: false,
85        }
86    }
87}
88
89impl LoggerConfig {
90    /// Creates a new [`LoggerConfig`] instance.
91    #[must_use]
92    pub const fn new(
93        stdout_level: LevelFilter,
94        fileout_level: LevelFilter,
95        component_level: HashMap<Ustr, LevelFilter>,
96        log_components_only: bool,
97        is_colored: bool,
98        print_config: bool,
99    ) -> Self {
100        Self {
101            stdout_level,
102            fileout_level,
103            component_level,
104            log_components_only,
105            is_colored,
106            print_config,
107        }
108    }
109
110    /// # Errors
111    ///
112    /// Returns an error if the spec string is invalid.
113    pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
114        let mut config = Self::default();
115        for kv in spec.split(';') {
116            let kv = kv.trim();
117            if kv.is_empty() {
118                continue;
119            }
120            let kv_lower = kv.to_lowercase(); // For case-insensitive comparison
121
122            if kv_lower == "log_components_only" {
123                config.log_components_only = true;
124            } else if kv_lower == "is_colored" {
125                config.is_colored = true;
126            } else if kv_lower == "print_config" {
127                config.print_config = true;
128            } else {
129                let parts: Vec<&str> = kv.split('=').collect();
130                if parts.len() != 2 {
131                    anyhow::bail!("Invalid spec pair: {}", kv);
132                }
133                let k = parts[0].trim(); // Trim key
134                let v = parts[1].trim(); // Trim value
135                let lvl = LevelFilter::from_str(v)
136                    .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
137                let k_lower = k.to_lowercase(); // Case-insensitive key matching
138                match k_lower.as_str() {
139                    "stdout" => config.stdout_level = lvl,
140                    "fileout" => config.fileout_level = lvl,
141                    _ => {
142                        config.component_level.insert(Ustr::from(k), lvl);
143                    }
144                }
145            }
146        }
147        Ok(config)
148    }
149
150    /// Retrieves the logger configuration from the "`NAUTILUS_LOG`" environment variable.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the variable is unset or invalid.
155    pub fn from_env() -> anyhow::Result<Self> {
156        let spec = env::var("NAUTILUS_LOG")?;
157        Self::from_spec(&spec)
158    }
159}
160
161/// A high-performance logger utilizing a MPSC channel under the hood.
162///
163/// A logger is initialized with a [`LoggerConfig`] to set up different logging levels for
164/// stdout, file, and components. The logger spawns a thread that listens for [`LogEvent`]s
165/// sent via an MPSC channel.
166#[derive(Debug)]
167pub struct Logger {
168    /// Configuration for logging levels and behavior.
169    pub config: LoggerConfig,
170    /// Transmitter for sending log events to the 'logging' thread.
171    tx: std::sync::mpsc::Sender<LogEvent>,
172}
173
174/// Represents a type of log event.
175#[derive(Debug)]
176pub enum LogEvent {
177    /// A log line event.
178    Log(LogLine),
179    /// A command to flush all logger buffers.
180    Flush,
181    /// A command to close the logger.
182    Close,
183}
184
185/// Represents a log event which includes a message.
186#[derive(Clone, Debug, Serialize, Deserialize)]
187pub struct LogLine {
188    /// The timestamp for the event.
189    pub timestamp: UnixNanos,
190    /// The log level for the event.
191    pub level: Level,
192    /// The color for the log message content.
193    pub color: LogColor,
194    /// The Nautilus system component the log event originated from.
195    pub component: Ustr,
196    /// The log message content.
197    pub message: String,
198}
199
200impl Display for LogLine {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        write!(f, "[{}] {}: {}", self.level, self.component, self.message)
203    }
204}
205
206/// A wrapper around a log line that provides formatted and cached representations.
207///
208/// This struct contains a log line and provides various formatted versions
209/// of it, such as plain string, colored string, and JSON. It also caches the
210/// results for repeated calls, optimizing performance when the same message
211/// needs to be logged multiple times in different formats.
212#[derive(Clone, Debug)]
213pub struct LogLineWrapper {
214    /// The underlying log line that contains the log data.
215    line: LogLine,
216    /// Cached plain string representation of the log line.
217    cache: Option<String>,
218    /// Cached colored string representation of the log line.
219    colored: Option<String>,
220    /// The ID of the trader associated with this log event.
221    trader_id: Ustr,
222}
223
224impl LogLineWrapper {
225    /// Creates a new [`LogLineWrapper`] instance.
226    #[must_use]
227    pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
228        Self {
229            line,
230            cache: None,
231            colored: None,
232            trader_id,
233        }
234    }
235
236    /// Returns the plain log message string, caching the result.
237    ///
238    /// This method constructs the log line format and caches it for repeated calls. Useful when the
239    /// same log message needs to be printed multiple times.
240    pub fn get_string(&mut self) -> &str {
241        self.cache.get_or_insert_with(|| {
242            format!(
243                "{} [{}] {}.{}: {}\n",
244                unix_nanos_to_iso8601(self.line.timestamp),
245                self.line.level,
246                self.trader_id,
247                &self.line.component,
248                &self.line.message,
249            )
250        })
251    }
252
253    /// Returns the colored log message string, caching the result.
254    ///
255    /// This method constructs the colored log line format and caches the result
256    /// for repeated calls, providing the message with ANSI color codes if the
257    /// logger is configured to use colors.
258    pub fn get_colored(&mut self) -> &str {
259        self.colored.get_or_insert_with(|| {
260            format!(
261                "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
262                unix_nanos_to_iso8601(self.line.timestamp),
263                &self.line.color.as_ansi(),
264                self.line.level,
265                self.trader_id,
266                &self.line.component,
267                &self.line.message,
268            )
269        })
270    }
271
272    /// Returns the log message as a JSON string.
273    ///
274    /// This method serializes the log line and its associated metadata
275    /// (timestamp, trader ID, etc.) into a JSON string format. This is useful
276    /// for structured logging or when logs need to be stored in a JSON format.
277    /// # Panics
278    ///
279    /// Panics if serialization of the log event to JSON fails.
280    #[must_use]
281    pub fn get_json(&self) -> String {
282        let json_string =
283            serde_json::to_string(&self).expect("Error serializing log event to string");
284        format!("{json_string}\n")
285    }
286}
287
288impl Serialize for LogLineWrapper {
289    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290    where
291        S: Serializer,
292    {
293        let mut json_obj = IndexMap::new();
294        let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
295        json_obj.insert("timestamp".to_string(), timestamp);
296        json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
297        json_obj.insert("level".to_string(), self.line.level.to_string());
298        json_obj.insert("color".to_string(), self.line.color.to_string());
299        json_obj.insert("component".to_string(), self.line.component.to_string());
300        json_obj.insert("message".to_string(), self.line.message.clone());
301
302        json_obj.serialize(serializer)
303    }
304}
305
306impl Log for Logger {
307    fn enabled(&self, metadata: &log::Metadata) -> bool {
308        !LOGGING_BYPASSED.load(Ordering::Relaxed)
309            && (metadata.level() == Level::Error
310                || metadata.level() <= self.config.stdout_level
311                || metadata.level() <= self.config.fileout_level)
312    }
313
314    fn log(&self, record: &log::Record) {
315        if self.enabled(record.metadata()) {
316            let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
317                get_atomic_clock_realtime().get_time_ns()
318            } else {
319                get_atomic_clock_static().get_time_ns()
320            };
321            let level = record.level();
322            let key_values = record.key_values();
323            let color: LogColor = key_values
324                .get(KV_COLOR.into())
325                .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
326                .unwrap_or(level.into());
327            let component = key_values.get(KV_COMPONENT.into()).map_or_else(
328                || Ustr::from(record.metadata().target()),
329                |v| Ustr::from(&v.to_string()),
330            );
331
332            let line = LogLine {
333                timestamp,
334                level,
335                color,
336                component,
337                message: format!("{}", record.args()),
338            };
339            if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
340                eprintln!("Error sending log event (receiver closed): {line}");
341            }
342        }
343    }
344
345    fn flush(&self) {
346        // Don't attempt to flush if we're already bypassed/shutdown
347        if LOGGING_BYPASSED.load(Ordering::Relaxed) {
348            return;
349        }
350
351        if let Err(e) = self.tx.send(LogEvent::Flush) {
352            eprintln!("Error sending flush log event: {e}");
353        }
354    }
355}
356
357#[allow(clippy::too_many_arguments)]
358impl Logger {
359    /// Initializes the logger based on the `NAUTILUS_LOG` environment variable.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if reading the environment variable or parsing the configuration fails.
364    pub fn init_with_env(
365        trader_id: TraderId,
366        instance_id: UUID4,
367        file_config: FileWriterConfig,
368    ) -> anyhow::Result<LogGuard> {
369        let config = LoggerConfig::from_env()?;
370        Self::init_with_config(trader_id, instance_id, config, file_config)
371    }
372
373    /// Initializes the logger with the given configuration.
374    ///
375    /// # Examples
376    ///
377    /// ```rust
378    /// let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error");
379    /// let file_config = FileWriterConfig::default();
380    /// let log_guard = Logger::init_with_config(trader_id, instance_id, config, file_config);
381    /// ```
382    /// Initializes the logger with the given `LoggerConfig` and `FileWriterConfig`.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if the logger fails to register or initialize the background thread.
387    pub fn init_with_config(
388        trader_id: TraderId,
389        instance_id: UUID4,
390        config: LoggerConfig,
391        file_config: FileWriterConfig,
392    ) -> anyhow::Result<LogGuard> {
393        let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
394
395        let logger_tx = tx.clone();
396        let logger = Self {
397            tx: logger_tx,
398            config: config.clone(),
399        };
400
401        set_boxed_logger(Box::new(logger))?;
402
403        // Store the sender globally so additional guards can be created
404        if LOGGER_TX.set(tx).is_err() {
405            debug_assert!(
406                false,
407                "LOGGER_TX already set - re-initialization not supported"
408            );
409        }
410
411        let print_config = config.print_config;
412        if print_config {
413            println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
414            println!("Logger initialized with {config:?} {file_config:?}");
415        }
416
417        let handle = std::thread::Builder::new()
418            .name(LOGGING.to_string())
419            .spawn(move || {
420                Self::handle_messages(
421                    trader_id.to_string(),
422                    instance_id.to_string(),
423                    config,
424                    file_config,
425                    rx,
426                );
427            })?;
428
429        // Store the handle globally
430        if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
431            debug_assert!(
432                handle_guard.is_none(),
433                "LOGGER_HANDLE already set - re-initialization not supported"
434            );
435            *handle_guard = Some(handle);
436        }
437
438        let max_level = log::LevelFilter::Trace;
439        set_max_level(max_level);
440
441        if print_config {
442            println!("Logger set as `log` implementation with max level {max_level}");
443        }
444
445        LogGuard::new()
446            .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
447    }
448
449    fn handle_messages(
450        trader_id: String,
451        instance_id: String,
452        config: LoggerConfig,
453        file_config: FileWriterConfig,
454        rx: std::sync::mpsc::Receiver<LogEvent>,
455    ) {
456        let LoggerConfig {
457            stdout_level,
458            fileout_level,
459            component_level,
460            log_components_only,
461            is_colored,
462            print_config: _,
463        } = config;
464
465        let trader_id_cache = Ustr::from(&trader_id);
466
467        // Set up std I/O buffers
468        let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
469        let mut stderr_writer = StderrWriter::new(is_colored);
470
471        // Conditionally create file writer based on fileout_level
472        let mut file_writer_opt = if fileout_level == LevelFilter::Off {
473            None
474        } else {
475            FileWriter::new(trader_id, instance_id, file_config, fileout_level)
476        };
477
478        let process_event = |event: LogEvent,
479                             stdout_writer: &mut StdoutWriter,
480                             stderr_writer: &mut StderrWriter,
481                             file_writer_opt: &mut Option<FileWriter>| {
482            match event {
483                LogEvent::Log(line) => {
484                    let component_filter_level = component_level.get(&line.component);
485
486                    if log_components_only && component_filter_level.is_none() {
487                        return;
488                    }
489
490                    if let Some(&filter_level) = component_filter_level
491                        && line.level > filter_level
492                    {
493                        return;
494                    }
495
496                    let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
497
498                    if stderr_writer.enabled(&wrapper.line) {
499                        if is_colored {
500                            stderr_writer.write(wrapper.get_colored());
501                        } else {
502                            stderr_writer.write(wrapper.get_string());
503                        }
504                    }
505
506                    if stdout_writer.enabled(&wrapper.line) {
507                        if is_colored {
508                            stdout_writer.write(wrapper.get_colored());
509                        } else {
510                            stdout_writer.write(wrapper.get_string());
511                        }
512                    }
513
514                    if let Some(file_writer) = file_writer_opt
515                        && file_writer.enabled(&wrapper.line)
516                    {
517                        if file_writer.json_format {
518                            file_writer.write(&wrapper.get_json());
519                        } else {
520                            file_writer.write(wrapper.get_string());
521                        }
522                    }
523                }
524                LogEvent::Flush => {
525                    stdout_writer.flush();
526                    stderr_writer.flush();
527
528                    if let Some(file_writer) = file_writer_opt {
529                        file_writer.flush();
530                    }
531                }
532                LogEvent::Close => {
533                    // Close handled in the main loop; ignore here.
534                }
535            }
536        };
537
538        // Continue to receive and handle log events until channel is hung up
539        while let Ok(event) = rx.recv() {
540            match event {
541                LogEvent::Log(_) | LogEvent::Flush => process_event(
542                    event,
543                    &mut stdout_writer,
544                    &mut stderr_writer,
545                    &mut file_writer_opt,
546                ),
547                LogEvent::Close => {
548                    // First flush what's been written so far
549                    stdout_writer.flush();
550                    stderr_writer.flush();
551
552                    if let Some(ref mut file_writer) = file_writer_opt {
553                        file_writer.flush();
554                    }
555
556                    // Drain any remaining events that may have raced with shutdown
557                    // This ensures logs enqueued just before/around shutdown aren't lost.
558                    while let Ok(evt) = rx.try_recv() {
559                        match evt {
560                            LogEvent::Close => (), // ignore extra Close events
561                            _ => process_event(
562                                evt,
563                                &mut stdout_writer,
564                                &mut stderr_writer,
565                                &mut file_writer_opt,
566                            ),
567                        }
568                    }
569
570                    // Final flush after draining
571                    stdout_writer.flush();
572                    stderr_writer.flush();
573
574                    if let Some(ref mut file_writer) = file_writer_opt {
575                        file_writer.flush();
576                    }
577
578                    break;
579                }
580            }
581        }
582    }
583}
584
585/// Gracefully shuts down the logging subsystem.
586///
587/// Performs the same shutdown sequence as dropping the last `LogGuard`, but can be called
588/// explicitly for deterministic shutdown timing (e.g., testing or Windows Python applications).
589///
590/// # Safety
591///
592/// Safe to call multiple times. Thread join is skipped if called from the logging thread.
593pub(crate) fn shutdown_graceful() {
594    // Prevent further logging
595    LOGGING_BYPASSED.store(true, Ordering::SeqCst);
596    log::set_max_level(log::LevelFilter::Off);
597
598    // Signal Close if the sender exists
599    if let Some(tx) = LOGGER_TX.get() {
600        let _ = tx.send(LogEvent::Close);
601    }
602
603    if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
604        && let Some(handle) = handle_guard.take()
605        && handle.thread().id() != std::thread::current().id()
606    {
607        let _ = handle.join();
608    }
609
610    LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
611}
612
613pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
614    let color = Value::from(color as u8);
615
616    match level {
617        LogLevel::Off => {}
618        LogLevel::Trace => {
619            log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
620        }
621        LogLevel::Debug => {
622            log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
623        }
624        LogLevel::Info => {
625            log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
626        }
627        LogLevel::Warning => {
628            log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
629        }
630        LogLevel::Error => {
631            log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
632        }
633    }
634}
635
636/// A guard that manages the lifecycle of the logging subsystem.
637///
638/// `LogGuard` ensures the logging thread remains active while instances exist and properly
639/// terminates when all guards are dropped. The system uses reference counting to track active
640/// guards - when the last `LogGuard` is dropped, the logging thread is joined to ensure all
641/// pending log messages are written before the process terminates.
642///
643/// # Reference Counting
644///
645/// The logging system maintains a global atomic counter of active `LogGuard` instances. This
646/// ensures that:
647/// - The logging thread remains active as long as at least one `LogGuard` exists.
648/// - All log messages are properly flushed when intermediate guards are dropped.
649/// - The logging thread is cleanly terminated and joined when the last guard is dropped.
650///
651/// # Shutdown Behavior
652///
653/// When the last guard is dropped, the logging thread is signaled to close, drains pending
654/// messages, and is joined to ensure all logs are written before process termination.
655///
656/// **Python on Windows:** Non-deterministic GC order during interpreter shutdown can
657/// occasionally prevent proper thread join, resulting in truncated logs.
658///
659/// # Limits
660///
661/// The system supports a maximum of 255 concurrent `LogGuard` instances.
662#[cfg_attr(
663    feature = "python",
664    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
665)]
666#[derive(Debug)]
667pub struct LogGuard {
668    tx: std::sync::mpsc::Sender<LogEvent>,
669}
670
671impl LogGuard {
672    /// Creates a new [`LogGuard`] instance from the global logger.
673    ///
674    /// Returns `None` if logging has not been initialized.
675    ///
676    /// # Panics
677    ///
678    /// Panics if the number of active LogGuards would exceed 255.
679    #[must_use]
680    pub fn new() -> Option<Self> {
681        LOGGER_TX.get().map(|tx| {
682            LOGGING_GUARDS_ACTIVE
683                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
684                    if count == u8::MAX {
685                        None // Reject the update if we're at the limit
686                    } else {
687                        Some(count + 1)
688                    }
689                })
690                .expect("Maximum number of active LogGuards (255) exceeded");
691
692            Self { tx: tx.clone() }
693        })
694    }
695}
696
697impl Drop for LogGuard {
698    /// Handles cleanup when a `LogGuard` is dropped.
699    ///
700    /// Sends `Flush` if other guards remain active, otherwise sends `Close`, joins the
701    /// logging thread, and resets the subsystem state.
702    fn drop(&mut self) {
703        let previous_count = LOGGING_GUARDS_ACTIVE
704            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
705                if count == 0 {
706                    panic!("LogGuard reference count underflow");
707                }
708                Some(count - 1)
709            })
710            .expect("Failed to decrement LogGuard count");
711
712        // Check if this was the last LogGuard - re-check after decrement to avoid race
713        if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
714            // This is truly the last LogGuard, so we should close the logger and join the thread
715            // to ensure all log messages are written before the process terminates.
716            // Prevent any new log events from being accepted while shutting down.
717            LOGGING_BYPASSED.store(true, Ordering::SeqCst);
718
719            // Disable all log levels to reduce overhead on late calls
720            log::set_max_level(log::LevelFilter::Off);
721
722            // Ensure Close is delivered before joining (critical for shutdown)
723            let _ = self.tx.send(LogEvent::Close);
724
725            // Join the logging thread to ensure all pending logs are written
726            if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
727                && let Some(handle) = handle_guard.take()
728            {
729                // Avoid self-join deadlock
730                if handle.thread().id() != std::thread::current().id() {
731                    let _ = handle.join();
732                }
733            }
734
735            // Reset LOGGING_INITIALIZED since the logging thread has terminated
736            LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
737        } else {
738            // Other LogGuards are still active, just flush our logs
739            let _ = self.tx.send(LogEvent::Flush);
740        }
741    }
742}
743
744////////////////////////////////////////////////////////////////////////////////
745// Tests
746////////////////////////////////////////////////////////////////////////////////
747#[cfg(test)]
748mod tests {
749    use std::{collections::HashMap, thread::sleep, time::Duration};
750
751    use log::LevelFilter;
752    use nautilus_core::UUID4;
753    use nautilus_model::identifiers::TraderId;
754    use rstest::*;
755    use serde_json::Value;
756    use tempfile::tempdir;
757    use ustr::Ustr;
758
759    use super::*;
760    use crate::{
761        enums::LogColor,
762        logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
763        testing::wait_until,
764    };
765
766    #[rstest]
767    fn log_message_serialization() {
768        let log_message = LogLine {
769            timestamp: UnixNanos::default(),
770            level: log::Level::Info,
771            color: LogColor::Normal,
772            component: Ustr::from("Portfolio"),
773            message: "This is a log message".to_string(),
774        };
775
776        let serialized_json = serde_json::to_string(&log_message).unwrap();
777        let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
778
779        assert_eq!(deserialized_value["level"], "INFO");
780        assert_eq!(deserialized_value["component"], "Portfolio");
781        assert_eq!(deserialized_value["message"], "This is a log message");
782    }
783
784    #[rstest]
785    fn log_config_parsing() {
786        let config =
787            LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
788                .unwrap();
789        assert_eq!(
790            config,
791            LoggerConfig {
792                stdout_level: LevelFilter::Info,
793                fileout_level: LevelFilter::Debug,
794                component_level: HashMap::from_iter(vec![(
795                    Ustr::from("RiskEngine"),
796                    LevelFilter::Error
797                )]),
798                log_components_only: false,
799                is_colored: true,
800                print_config: false,
801            }
802        );
803    }
804
805    #[rstest]
806    fn log_config_parsing2() {
807        let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
808        assert_eq!(
809            config,
810            LoggerConfig {
811                stdout_level: LevelFilter::Warn,
812                fileout_level: LevelFilter::Error,
813                component_level: HashMap::new(),
814                log_components_only: false,
815                is_colored: true,
816                print_config: true,
817            }
818        );
819    }
820
821    #[rstest]
822    fn log_config_parsing_with_log_components_only() {
823        let config =
824            LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
825        assert_eq!(
826            config,
827            LoggerConfig {
828                stdout_level: LevelFilter::Info,
829                fileout_level: LevelFilter::Off,
830                component_level: HashMap::from_iter(vec![(
831                    Ustr::from("RiskEngine"),
832                    LevelFilter::Debug
833                )]),
834                log_components_only: true,
835                is_colored: true,
836                print_config: false,
837            }
838        );
839    }
840
841    // These tests need to run serially because they use global logging state
842    mod serial_tests {
843        use super::*;
844
845        #[rstest]
846        fn test_logging_to_file() {
847            let config = LoggerConfig {
848                fileout_level: LevelFilter::Debug,
849                ..Default::default()
850            };
851
852            let temp_dir = tempdir().expect("Failed to create temporary directory");
853            let file_config = FileWriterConfig {
854                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
855                ..Default::default()
856            };
857
858            let log_guard = Logger::init_with_config(
859                TraderId::from("TRADER-001"),
860                UUID4::new(),
861                config,
862                file_config,
863            );
864
865            logging_clock_set_static_mode();
866            logging_clock_set_static_time(1_650_000_000_000_000);
867
868            log::info!(
869                component = "RiskEngine";
870                "This is a test."
871            );
872
873            let mut log_contents = String::new();
874
875            wait_until(
876                || {
877                    std::fs::read_dir(&temp_dir)
878                        .expect("Failed to read directory")
879                        .filter_map(Result::ok)
880                        .any(|entry| entry.path().is_file())
881                },
882                Duration::from_secs(3),
883            );
884
885            drop(log_guard); // Ensure log buffers are flushed
886
887            wait_until(
888                || {
889                    let log_file_path = std::fs::read_dir(&temp_dir)
890                        .expect("Failed to read directory")
891                        .filter_map(Result::ok)
892                        .find(|entry| entry.path().is_file())
893                        .expect("No files found in directory")
894                        .path();
895                    log_contents = std::fs::read_to_string(log_file_path)
896                        .expect("Error while reading log file");
897                    !log_contents.is_empty()
898                },
899                Duration::from_secs(3),
900            );
901
902            assert_eq!(
903                log_contents,
904                "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
905            );
906        }
907
908        #[rstest]
909        fn test_shutdown_drains_backlog_tail() {
910            // Configure file logging at Info level
911            let config = LoggerConfig {
912                stdout_level: LevelFilter::Off,
913                fileout_level: LevelFilter::Info,
914                ..Default::default()
915            };
916
917            let temp_dir = tempdir().expect("Failed to create temporary directory");
918            let file_config = FileWriterConfig {
919                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
920                ..Default::default()
921            };
922
923            let log_guard = Logger::init_with_config(
924                TraderId::from("TRADER-TAIL"),
925                UUID4::new(),
926                config,
927                file_config,
928            )
929            .expect("Failed to initialize logger");
930
931            // Use static time for reproducibility
932            logging_clock_set_static_mode();
933            logging_clock_set_static_time(1_700_000_000_000_000);
934
935            // Enqueue a known number of messages synchronously
936            const N: usize = 1000;
937            for i in 0..N {
938                log::info!(component = "TailDrain"; "BacklogTest {i}");
939            }
940
941            // Drop guard to trigger shutdown (bypass + close + drain)
942            drop(log_guard);
943
944            // Wait until the file exists and contains at least N lines with our marker
945            let mut count = 0usize;
946            wait_until(
947                || {
948                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
949                        .expect("Failed to read directory")
950                        .filter_map(Result::ok)
951                        .find(|entry| entry.path().is_file())
952                    {
953                        let log_file_path = log_file.path();
954                        if let Ok(contents) = std::fs::read_to_string(log_file_path) {
955                            count = contents
956                                .lines()
957                                .filter(|l| l.contains("BacklogTest "))
958                                .count();
959                            count >= N
960                        } else {
961                            false
962                        }
963                    } else {
964                        false
965                    }
966                },
967                Duration::from_secs(5),
968            );
969
970            assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
971        }
972
973        #[rstest]
974        fn test_log_component_level_filtering() {
975            let config =
976                LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
977
978            let temp_dir = tempdir().expect("Failed to create temporary directory");
979            let file_config = FileWriterConfig {
980                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
981                ..Default::default()
982            };
983
984            let log_guard = Logger::init_with_config(
985                TraderId::from("TRADER-001"),
986                UUID4::new(),
987                config,
988                file_config,
989            );
990
991            logging_clock_set_static_mode();
992            logging_clock_set_static_time(1_650_000_000_000_000);
993
994            log::info!(
995                component = "RiskEngine";
996                "This is a test."
997            );
998
999            drop(log_guard); // Ensure log buffers are flushed
1000
1001            wait_until(
1002                || {
1003                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
1004                        .expect("Failed to read directory")
1005                        .filter_map(Result::ok)
1006                        .find(|entry| entry.path().is_file())
1007                    {
1008                        let log_file_path = log_file.path();
1009                        let log_contents = std::fs::read_to_string(log_file_path)
1010                            .expect("Error while reading log file");
1011                        !log_contents.contains("RiskEngine")
1012                    } else {
1013                        false
1014                    }
1015                },
1016                Duration::from_secs(3),
1017            );
1018
1019            assert!(
1020                std::fs::read_dir(&temp_dir)
1021                    .expect("Failed to read directory")
1022                    .filter_map(Result::ok)
1023                    .any(|entry| entry.path().is_file()),
1024                "Log file exists"
1025            );
1026        }
1027
1028        #[rstest]
1029        fn test_logging_to_file_in_json_format() {
1030            let config =
1031                LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1032                    .unwrap();
1033
1034            let temp_dir = tempdir().expect("Failed to create temporary directory");
1035            let file_config = FileWriterConfig {
1036                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1037                file_format: Some("json".to_string()),
1038                ..Default::default()
1039            };
1040
1041            let log_guard = Logger::init_with_config(
1042                TraderId::from("TRADER-001"),
1043                UUID4::new(),
1044                config,
1045                file_config,
1046            );
1047
1048            logging_clock_set_static_mode();
1049            logging_clock_set_static_time(1_650_000_000_000_000);
1050
1051            log::info!(
1052                component = "RiskEngine";
1053                "This is a test."
1054            );
1055
1056            let mut log_contents = String::new();
1057
1058            drop(log_guard); // Ensure log buffers are flushed
1059
1060            wait_until(
1061                || {
1062                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
1063                        .expect("Failed to read directory")
1064                        .filter_map(Result::ok)
1065                        .find(|entry| entry.path().is_file())
1066                    {
1067                        let log_file_path = log_file.path();
1068                        log_contents = std::fs::read_to_string(log_file_path)
1069                            .expect("Error while reading log file");
1070                        !log_contents.is_empty()
1071                    } else {
1072                        false
1073                    }
1074                },
1075                Duration::from_secs(3),
1076            );
1077
1078            assert_eq!(
1079                log_contents,
1080                "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1081            );
1082        }
1083
1084        #[ignore = "Flaky test: Passing locally on some systems, failing in CI"]
1085        #[rstest]
1086        fn test_file_rotation_and_backup_limits() {
1087            // Create a temporary directory for log files
1088            let temp_dir = tempdir().expect("Failed to create temporary directory");
1089            let dir_path = temp_dir.path().to_str().unwrap().to_string();
1090
1091            // Configure a small max file size to trigger rotation quickly
1092            let max_backups = 3;
1093            let max_file_size = 100;
1094            let file_config = FileWriterConfig {
1095                directory: Some(dir_path.clone()),
1096                file_name: None,
1097                file_format: Some("log".to_string()),
1098                file_rotate: Some((max_file_size, max_backups).into()), // 100 bytes max size, 3 max backups
1099            };
1100
1101            // Create the file writer
1102            let config = LoggerConfig::from_spec("fileout=Info;Test=Info").unwrap();
1103            let log_guard = Logger::init_with_config(
1104                TraderId::from("TRADER-001"),
1105                UUID4::new(),
1106                config,
1107                file_config,
1108            );
1109
1110            log::info!(
1111                component = "Test";
1112                "Test log message with enough content to exceed our small max file size limit"
1113            );
1114
1115            sleep(Duration::from_millis(100));
1116
1117            // Count the number of log files in the directory
1118            let files: Vec<_> = std::fs::read_dir(&dir_path)
1119                .expect("Failed to read directory")
1120                .filter_map(Result::ok)
1121                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1122                .collect();
1123
1124            // We should have multiple files due to rotation
1125            assert_eq!(files.len(), 1);
1126
1127            log::info!(
1128                component = "Test";
1129                "Test log message with enough content to exceed our small max file size limit"
1130            );
1131
1132            sleep(Duration::from_millis(100));
1133
1134            // Count the number of log files in the directory
1135            let files: Vec<_> = std::fs::read_dir(&dir_path)
1136                .expect("Failed to read directory")
1137                .filter_map(Result::ok)
1138                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1139                .collect();
1140
1141            // We should have multiple files due to rotation
1142            assert_eq!(files.len(), 2);
1143
1144            for _ in 0..5 {
1145                // Write enough data to trigger a few rotations
1146                log::info!(
1147                component = "Test";
1148                "Test log message with enough content to exceed our small max file size limit"
1149                );
1150
1151                sleep(Duration::from_millis(100));
1152            }
1153
1154            // Count the number of log files in the directory
1155            let files: Vec<_> = std::fs::read_dir(&dir_path)
1156                .expect("Failed to read directory")
1157                .filter_map(Result::ok)
1158                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1159                .collect();
1160
1161            // We should have at most max_backups + 1 files (current file + backups)
1162            assert!(
1163                files.len() == max_backups as usize + 1,
1164                "Expected at most {} log files, found {}",
1165                max_backups,
1166                files.len()
1167            );
1168
1169            // Clean up
1170            drop(log_guard);
1171            drop(temp_dir);
1172        }
1173    }
1174}