nautilus_common/logging/
logger.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    env,
19    fmt::Display,
20    str::FromStr,
21    sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26    Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27    kv::{ToValue, Value},
28    set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31    UUID4, UnixNanos,
32    datetime::unix_nanos_to_iso8601,
33    time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
40use crate::{
41    enums::{LogColor, LogLevel},
42    logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46const KV_COLOR: &str = "color";
47const KV_COMPONENT: &str = "component";
48
49/// Global log sender which allows multiple log guards per process.
50static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
51
52/// Global handle to the logging thread - only one thread exists per process.
53static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
54
55#[cfg_attr(
56    feature = "python",
57    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
58)]
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct LoggerConfig {
61    /// Maximum log level to write to stdout.
62    pub stdout_level: LevelFilter,
63    /// Maximum log level to write to file (disabled is `Off`).
64    pub fileout_level: LevelFilter,
65    /// Per-component log levels, allowing finer-grained control.
66    component_level: HashMap<Ustr, LevelFilter>,
67    /// If only components with explicit component-level filters should be logged.
68    pub log_components_only: bool,
69    /// If logger is using ANSI color codes.
70    pub is_colored: bool,
71    /// If the configuration should be printed to stdout at initialization.
72    pub print_config: bool,
73}
74
75impl Default for LoggerConfig {
76    /// Creates a new default [`LoggerConfig`] instance.
77    fn default() -> Self {
78        Self {
79            stdout_level: LevelFilter::Info,
80            fileout_level: LevelFilter::Off,
81            component_level: HashMap::new(),
82            log_components_only: false,
83            is_colored: true,
84            print_config: false,
85        }
86    }
87}
88
89impl LoggerConfig {
90    /// Creates a new [`LoggerConfig`] instance.
91    #[must_use]
92    pub const fn new(
93        stdout_level: LevelFilter,
94        fileout_level: LevelFilter,
95        component_level: HashMap<Ustr, LevelFilter>,
96        log_components_only: bool,
97        is_colored: bool,
98        print_config: bool,
99    ) -> Self {
100        Self {
101            stdout_level,
102            fileout_level,
103            component_level,
104            log_components_only,
105            is_colored,
106            print_config,
107        }
108    }
109
110    /// # Errors
111    ///
112    /// Returns an error if the spec string is invalid.
113    pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
114        let mut config = Self::default();
115        for kv in spec.split(';') {
116            let kv = kv.trim();
117            if kv.is_empty() {
118                continue;
119            }
120            let kv_lower = kv.to_lowercase(); // For case-insensitive comparison
121
122            if kv_lower == "log_components_only" {
123                config.log_components_only = true;
124            } else if kv_lower == "is_colored" {
125                config.is_colored = true;
126            } else if kv_lower == "print_config" {
127                config.print_config = true;
128            } else {
129                let parts: Vec<&str> = kv.split('=').collect();
130                if parts.len() != 2 {
131                    anyhow::bail!("Invalid spec pair: {}", kv);
132                }
133                let k = parts[0].trim(); // Trim key
134                let v = parts[1].trim(); // Trim value
135                let lvl = LevelFilter::from_str(v)
136                    .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
137                let k_lower = k.to_lowercase(); // Case-insensitive key matching
138                match k_lower.as_str() {
139                    "stdout" => config.stdout_level = lvl,
140                    "fileout" => config.fileout_level = lvl,
141                    _ => {
142                        config.component_level.insert(Ustr::from(k), lvl);
143                    }
144                }
145            }
146        }
147        Ok(config)
148    }
149
150    /// Retrieves the logger configuration from the "`NAUTILUS_LOG`" environment variable.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the variable is unset or invalid.
155    pub fn from_env() -> anyhow::Result<Self> {
156        let spec = env::var("NAUTILUS_LOG")?;
157        Self::from_spec(&spec)
158    }
159}
160
161/// A high-performance logger utilizing a MPSC channel under the hood.
162///
163/// A logger is initialized with a [`LoggerConfig`] to set up different logging levels for
164/// stdout, file, and components. The logger spawns a thread that listens for [`LogEvent`]s
165/// sent via an MPSC channel.
166#[derive(Debug)]
167pub struct Logger {
168    /// Configuration for logging levels and behavior.
169    pub config: LoggerConfig,
170    /// Transmitter for sending log events to the 'logging' thread.
171    tx: std::sync::mpsc::Sender<LogEvent>,
172}
173
174/// Represents a type of log event.
175#[derive(Debug)]
176pub enum LogEvent {
177    /// A log line event.
178    Log(LogLine),
179    /// A command to flush all logger buffers.
180    Flush,
181    /// A command to close the logger.
182    Close,
183}
184
185/// Represents a log event which includes a message.
186#[derive(Clone, Debug, Serialize, Deserialize)]
187pub struct LogLine {
188    /// The timestamp for the event.
189    pub timestamp: UnixNanos,
190    /// The log level for the event.
191    pub level: Level,
192    /// The color for the log message content.
193    pub color: LogColor,
194    /// The Nautilus system component the log event originated from.
195    pub component: Ustr,
196    /// The log message content.
197    pub message: String,
198}
199
200impl Display for LogLine {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        write!(f, "[{}] {}: {}", self.level, self.component, self.message)
203    }
204}
205
206/// A wrapper around a log line that provides formatted and cached representations.
207///
208/// This struct contains a log line and provides various formatted versions
209/// of it, such as plain string, colored string, and JSON. It also caches the
210/// results for repeated calls, optimizing performance when the same message
211/// needs to be logged multiple times in different formats.
212#[derive(Clone, Debug)]
213pub struct LogLineWrapper {
214    /// The underlying log line that contains the log data.
215    line: LogLine,
216    /// Cached plain string representation of the log line.
217    cache: Option<String>,
218    /// Cached colored string representation of the log line.
219    colored: Option<String>,
220    /// The ID of the trader associated with this log event.
221    trader_id: Ustr,
222}
223
224impl LogLineWrapper {
225    /// Creates a new [`LogLineWrapper`] instance.
226    #[must_use]
227    pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
228        Self {
229            line,
230            cache: None,
231            colored: None,
232            trader_id,
233        }
234    }
235
236    /// Returns the plain log message string, caching the result.
237    ///
238    /// This method constructs the log line format and caches it for repeated calls. Useful when the
239    /// same log message needs to be printed multiple times.
240    pub fn get_string(&mut self) -> &str {
241        self.cache.get_or_insert_with(|| {
242            format!(
243                "{} [{}] {}.{}: {}\n",
244                unix_nanos_to_iso8601(self.line.timestamp),
245                self.line.level,
246                self.trader_id,
247                &self.line.component,
248                &self.line.message,
249            )
250        })
251    }
252
253    /// Returns the colored log message string, caching the result.
254    ///
255    /// This method constructs the colored log line format and caches the result
256    /// for repeated calls, providing the message with ANSI color codes if the
257    /// logger is configured to use colors.
258    pub fn get_colored(&mut self) -> &str {
259        self.colored.get_or_insert_with(|| {
260            format!(
261                "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
262                unix_nanos_to_iso8601(self.line.timestamp),
263                &self.line.color.as_ansi(),
264                self.line.level,
265                self.trader_id,
266                &self.line.component,
267                &self.line.message,
268            )
269        })
270    }
271
272    /// Returns the log message as a JSON string.
273    ///
274    /// This method serializes the log line and its associated metadata
275    /// (timestamp, trader ID, etc.) into a JSON string format. This is useful
276    /// for structured logging or when logs need to be stored in a JSON format.
277    /// # Panics
278    ///
279    /// Panics if serialization of the log event to JSON fails.
280    #[must_use]
281    pub fn get_json(&self) -> String {
282        let json_string =
283            serde_json::to_string(&self).expect("Error serializing log event to string");
284        format!("{json_string}\n")
285    }
286}
287
288impl Serialize for LogLineWrapper {
289    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290    where
291        S: Serializer,
292    {
293        let mut json_obj = IndexMap::new();
294        let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
295        json_obj.insert("timestamp".to_string(), timestamp);
296        json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
297        json_obj.insert("level".to_string(), self.line.level.to_string());
298        json_obj.insert("color".to_string(), self.line.color.to_string());
299        json_obj.insert("component".to_string(), self.line.component.to_string());
300        json_obj.insert("message".to_string(), self.line.message.to_string());
301
302        json_obj.serialize(serializer)
303    }
304}
305
306impl Log for Logger {
307    fn enabled(&self, metadata: &log::Metadata) -> bool {
308        !LOGGING_BYPASSED.load(Ordering::Relaxed)
309            && (metadata.level() == Level::Error
310                || metadata.level() <= self.config.stdout_level
311                || metadata.level() <= self.config.fileout_level)
312    }
313
314    fn log(&self, record: &log::Record) {
315        if self.enabled(record.metadata()) {
316            let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
317                get_atomic_clock_realtime().get_time_ns()
318            } else {
319                get_atomic_clock_static().get_time_ns()
320            };
321            let level = record.level();
322            let key_values = record.key_values();
323            let color: LogColor = key_values
324                .get(KV_COLOR.into())
325                .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
326                .unwrap_or(level.into());
327            let component = key_values.get(KV_COMPONENT.into()).map_or_else(
328                || Ustr::from(record.metadata().target()),
329                |v| Ustr::from(&v.to_string()),
330            );
331
332            let line = LogLine {
333                timestamp,
334                level,
335                color,
336                component,
337                message: format!("{}", record.args()),
338            };
339            if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
340                eprintln!("Error sending log event (receiver closed): {line}");
341            }
342        }
343    }
344
345    fn flush(&self) {
346        // Don't attempt to flush if we're already bypassed/shutdown
347        if LOGGING_BYPASSED.load(Ordering::Relaxed) {
348            return;
349        }
350
351        if let Err(e) = self.tx.send(LogEvent::Flush) {
352            eprintln!("Error sending flush log event: {e}");
353        }
354    }
355}
356
357#[allow(clippy::too_many_arguments)]
358impl Logger {
359    /// Initializes the logger based on the `NAUTILUS_LOG` environment variable.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if reading the environment variable or parsing the configuration fails.
364    pub fn init_with_env(
365        trader_id: TraderId,
366        instance_id: UUID4,
367        file_config: FileWriterConfig,
368    ) -> anyhow::Result<LogGuard> {
369        let config = LoggerConfig::from_env()?;
370        Self::init_with_config(trader_id, instance_id, config, file_config)
371    }
372
373    /// Initializes the logger with the given configuration.
374    ///
375    /// # Examples
376    ///
377    /// ```rust
378    /// let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error");
379    /// let file_config = FileWriterConfig::default();
380    /// let log_guard = Logger::init_with_config(trader_id, instance_id, config, file_config);
381    /// ```
382    /// Initializes the logger with the given `LoggerConfig` and `FileWriterConfig`.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if the logger fails to register or initialize the background thread.
387    pub fn init_with_config(
388        trader_id: TraderId,
389        instance_id: UUID4,
390        config: LoggerConfig,
391        file_config: FileWriterConfig,
392    ) -> anyhow::Result<LogGuard> {
393        let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
394
395        let logger_tx = tx.clone();
396        let logger = Self {
397            tx: logger_tx,
398            config: config.clone(),
399        };
400
401        set_boxed_logger(Box::new(logger))?;
402
403        // Store the sender globally so additional guards can be created
404        if LOGGER_TX.set(tx.clone()).is_err() {
405            debug_assert!(
406                false,
407                "LOGGER_TX already set - re-initialization not supported"
408            );
409        }
410
411        let print_config = config.print_config;
412        if print_config {
413            println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
414            println!("Logger initialized with {config:?} {file_config:?}");
415        }
416
417        let handle = std::thread::Builder::new()
418            .name(LOGGING.to_string())
419            .spawn(move || {
420                Self::handle_messages(
421                    trader_id.to_string(),
422                    instance_id.to_string(),
423                    config,
424                    file_config,
425                    rx,
426                );
427            })?;
428
429        // Store the handle globally
430        if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
431            debug_assert!(
432                handle_guard.is_none(),
433                "LOGGER_HANDLE already set - re-initialization not supported"
434            );
435            *handle_guard = Some(handle);
436        }
437
438        let max_level = log::LevelFilter::Trace;
439        set_max_level(max_level);
440
441        if print_config {
442            println!("Logger set as `log` implementation with max level {max_level}");
443        }
444
445        LogGuard::new()
446            .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
447    }
448
449    fn handle_messages(
450        trader_id: String,
451        instance_id: String,
452        config: LoggerConfig,
453        file_config: FileWriterConfig,
454        rx: std::sync::mpsc::Receiver<LogEvent>,
455    ) {
456        let LoggerConfig {
457            stdout_level,
458            fileout_level,
459            component_level,
460            log_components_only,
461            is_colored,
462            print_config: _,
463        } = config;
464
465        let trader_id_cache = Ustr::from(&trader_id);
466
467        // Set up std I/O buffers
468        let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
469        let mut stderr_writer = StderrWriter::new(is_colored);
470
471        // Conditionally create file writer based on fileout_level
472        let mut file_writer_opt = if fileout_level == LevelFilter::Off {
473            None
474        } else {
475            FileWriter::new(
476                trader_id.clone(),
477                instance_id.clone(),
478                file_config.clone(),
479                fileout_level,
480            )
481        };
482
483        let process_event = |event: LogEvent,
484                             stdout_writer: &mut StdoutWriter,
485                             stderr_writer: &mut StderrWriter,
486                             file_writer_opt: &mut Option<FileWriter>| {
487            match event {
488                LogEvent::Log(line) => {
489                    let component_filter_level = component_level.get(&line.component);
490
491                    if log_components_only && component_filter_level.is_none() {
492                        return;
493                    }
494
495                    if let Some(&filter_level) = component_filter_level
496                        && line.level > filter_level
497                    {
498                        return;
499                    }
500
501                    let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
502
503                    if stderr_writer.enabled(&wrapper.line) {
504                        if is_colored {
505                            stderr_writer.write(wrapper.get_colored());
506                        } else {
507                            stderr_writer.write(wrapper.get_string());
508                        }
509                    }
510
511                    if stdout_writer.enabled(&wrapper.line) {
512                        if is_colored {
513                            stdout_writer.write(wrapper.get_colored());
514                        } else {
515                            stdout_writer.write(wrapper.get_string());
516                        }
517                    }
518
519                    if let Some(file_writer) = file_writer_opt
520                        && file_writer.enabled(&wrapper.line)
521                    {
522                        if file_writer.json_format {
523                            file_writer.write(&wrapper.get_json());
524                        } else {
525                            file_writer.write(wrapper.get_string());
526                        }
527                    }
528                }
529                LogEvent::Flush => {
530                    stdout_writer.flush();
531                    stderr_writer.flush();
532
533                    if let Some(file_writer) = file_writer_opt {
534                        file_writer.flush();
535                    }
536                }
537                LogEvent::Close => {
538                    // Close handled in the main loop; ignore here.
539                }
540            }
541        };
542
543        // Continue to receive and handle log events until channel is hung up
544        while let Ok(event) = rx.recv() {
545            match event {
546                LogEvent::Log(_) | LogEvent::Flush => process_event(
547                    event,
548                    &mut stdout_writer,
549                    &mut stderr_writer,
550                    &mut file_writer_opt,
551                ),
552                LogEvent::Close => {
553                    // First flush what's been written so far
554                    stdout_writer.flush();
555                    stderr_writer.flush();
556
557                    if let Some(ref mut file_writer) = file_writer_opt {
558                        file_writer.flush();
559                    }
560
561                    // Drain any remaining events that may have raced with shutdown
562                    // This ensures logs enqueued just before/around shutdown aren't lost.
563                    while let Ok(evt) = rx.try_recv() {
564                        match evt {
565                            LogEvent::Close => (), // ignore extra Close events
566                            _ => process_event(
567                                evt,
568                                &mut stdout_writer,
569                                &mut stderr_writer,
570                                &mut file_writer_opt,
571                            ),
572                        }
573                    }
574
575                    // Final flush after draining
576                    stdout_writer.flush();
577                    stderr_writer.flush();
578
579                    if let Some(ref mut file_writer) = file_writer_opt {
580                        file_writer.flush();
581                    }
582
583                    break;
584                }
585            }
586        }
587    }
588}
589
590/// Gracefully shuts down the logging subsystem.
591///
592/// Performs the same shutdown sequence as dropping the last `LogGuard`, but can be called
593/// explicitly for deterministic shutdown timing (e.g., testing or Windows Python applications).
594///
595/// # Safety
596///
597/// Safe to call multiple times. Thread join is skipped if called from the logging thread.
598pub(crate) fn shutdown_graceful() {
599    // Prevent further logging
600    LOGGING_BYPASSED.store(true, Ordering::SeqCst);
601    log::set_max_level(log::LevelFilter::Off);
602
603    // Signal Close if the sender exists
604    if let Some(tx) = LOGGER_TX.get() {
605        let _ = tx.send(LogEvent::Close);
606    }
607
608    if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
609        && let Some(handle) = handle_guard.take()
610        && handle.thread().id() != std::thread::current().id()
611    {
612        let _ = handle.join();
613    }
614
615    LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
616}
617
618pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
619    let color = Value::from(color as u8);
620
621    match level {
622        LogLevel::Off => {}
623        LogLevel::Trace => {
624            log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
625        }
626        LogLevel::Debug => {
627            log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
628        }
629        LogLevel::Info => {
630            log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
631        }
632        LogLevel::Warning => {
633            log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
634        }
635        LogLevel::Error => {
636            log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
637        }
638    }
639}
640
641/// A guard that manages the lifecycle of the logging subsystem.
642///
643/// `LogGuard` ensures the logging thread remains active while instances exist and properly
644/// terminates when all guards are dropped. The system uses reference counting to track active
645/// guards - when the last `LogGuard` is dropped, the logging thread is joined to ensure all
646/// pending log messages are written before the process terminates.
647///
648/// # Reference Counting
649///
650/// The logging system maintains a global atomic counter of active `LogGuard` instances. This
651/// ensures that:
652/// - The logging thread remains active as long as at least one `LogGuard` exists.
653/// - All log messages are properly flushed when intermediate guards are dropped.
654/// - The logging thread is cleanly terminated and joined when the last guard is dropped.
655///
656/// # Shutdown Behavior
657///
658/// When the last guard is dropped, the logging thread is signaled to close, drains pending
659/// messages, and is joined to ensure all logs are written before process termination.
660///
661/// **Python on Windows:** Non-deterministic GC order during interpreter shutdown can
662/// occasionally prevent proper thread join, resulting in truncated logs.
663///
664/// # Limits
665///
666/// The system supports a maximum of 255 concurrent `LogGuard` instances.
667#[cfg_attr(
668    feature = "python",
669    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
670)]
671#[derive(Debug)]
672pub struct LogGuard {
673    tx: std::sync::mpsc::Sender<LogEvent>,
674}
675
676impl LogGuard {
677    /// Creates a new [`LogGuard`] instance from the global logger.
678    ///
679    /// Returns `None` if logging has not been initialized.
680    ///
681    /// # Panics
682    ///
683    /// Panics if the number of active LogGuards would exceed 255.
684    #[must_use]
685    pub fn new() -> Option<Self> {
686        LOGGER_TX.get().map(|tx| {
687            LOGGING_GUARDS_ACTIVE
688                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
689                    if count == u8::MAX {
690                        None // Reject the update if we're at the limit
691                    } else {
692                        Some(count + 1)
693                    }
694                })
695                .expect("Maximum number of active LogGuards (255) exceeded");
696
697            Self { tx: tx.clone() }
698        })
699    }
700}
701
702impl Drop for LogGuard {
703    /// Handles cleanup when a `LogGuard` is dropped.
704    ///
705    /// Sends `Flush` if other guards remain active, otherwise sends `Close`, joins the
706    /// logging thread, and resets the subsystem state.
707    fn drop(&mut self) {
708        let previous_count = LOGGING_GUARDS_ACTIVE
709            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
710                if count == 0 {
711                    panic!("LogGuard reference count underflow");
712                }
713                Some(count - 1)
714            })
715            .expect("Failed to decrement LogGuard count");
716
717        // Check if this was the last LogGuard - re-check after decrement to avoid race
718        if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
719            // This is truly the last LogGuard, so we should close the logger and join the thread
720            // to ensure all log messages are written before the process terminates.
721            // Prevent any new log events from being accepted while shutting down.
722            LOGGING_BYPASSED.store(true, Ordering::SeqCst);
723
724            // Disable all log levels to reduce overhead on late calls
725            log::set_max_level(log::LevelFilter::Off);
726
727            // Ensure Close is delivered before joining (critical for shutdown)
728            let _ = self.tx.send(LogEvent::Close);
729
730            // Join the logging thread to ensure all pending logs are written
731            if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
732                && let Some(handle) = handle_guard.take()
733            {
734                // Avoid self-join deadlock
735                if handle.thread().id() != std::thread::current().id() {
736                    let _ = handle.join();
737                }
738            }
739
740            // Reset LOGGING_INITIALIZED since the logging thread has terminated
741            LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
742        } else {
743            // Other LogGuards are still active, just flush our logs
744            let _ = self.tx.send(LogEvent::Flush);
745        }
746    }
747}
748
749////////////////////////////////////////////////////////////////////////////////
750// Tests
751////////////////////////////////////////////////////////////////////////////////
752#[cfg(test)]
753mod tests {
754    use std::{collections::HashMap, thread::sleep, time::Duration};
755
756    use log::LevelFilter;
757    use nautilus_core::UUID4;
758    use nautilus_model::identifiers::TraderId;
759    use rstest::*;
760    use serde_json::Value;
761    use tempfile::tempdir;
762    use ustr::Ustr;
763
764    use super::*;
765    use crate::{
766        enums::LogColor,
767        logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
768        testing::wait_until,
769    };
770
771    #[rstest]
772    fn log_message_serialization() {
773        let log_message = LogLine {
774            timestamp: UnixNanos::default(),
775            level: log::Level::Info,
776            color: LogColor::Normal,
777            component: Ustr::from("Portfolio"),
778            message: "This is a log message".to_string(),
779        };
780
781        let serialized_json = serde_json::to_string(&log_message).unwrap();
782        let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
783
784        assert_eq!(deserialized_value["level"], "INFO");
785        assert_eq!(deserialized_value["component"], "Portfolio");
786        assert_eq!(deserialized_value["message"], "This is a log message");
787    }
788
789    #[rstest]
790    fn log_config_parsing() {
791        let config =
792            LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
793                .unwrap();
794        assert_eq!(
795            config,
796            LoggerConfig {
797                stdout_level: LevelFilter::Info,
798                fileout_level: LevelFilter::Debug,
799                component_level: HashMap::from_iter(vec![(
800                    Ustr::from("RiskEngine"),
801                    LevelFilter::Error
802                )]),
803                log_components_only: false,
804                is_colored: true,
805                print_config: false,
806            }
807        );
808    }
809
810    #[rstest]
811    fn log_config_parsing2() {
812        let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
813        assert_eq!(
814            config,
815            LoggerConfig {
816                stdout_level: LevelFilter::Warn,
817                fileout_level: LevelFilter::Error,
818                component_level: HashMap::new(),
819                log_components_only: false,
820                is_colored: true,
821                print_config: true,
822            }
823        );
824    }
825
826    #[rstest]
827    fn log_config_parsing_with_log_components_only() {
828        let config =
829            LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
830        assert_eq!(
831            config,
832            LoggerConfig {
833                stdout_level: LevelFilter::Info,
834                fileout_level: LevelFilter::Off,
835                component_level: HashMap::from_iter(vec![(
836                    Ustr::from("RiskEngine"),
837                    LevelFilter::Debug
838                )]),
839                log_components_only: true,
840                is_colored: true,
841                print_config: false,
842            }
843        );
844    }
845
846    // These tests need to run serially because they use global logging state
847    mod serial_tests {
848        use super::*;
849
850        #[rstest]
851        fn test_logging_to_file() {
852            let config = LoggerConfig {
853                fileout_level: LevelFilter::Debug,
854                ..Default::default()
855            };
856
857            let temp_dir = tempdir().expect("Failed to create temporary directory");
858            let file_config = FileWriterConfig {
859                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
860                ..Default::default()
861            };
862
863            let log_guard = Logger::init_with_config(
864                TraderId::from("TRADER-001"),
865                UUID4::new(),
866                config,
867                file_config,
868            );
869
870            logging_clock_set_static_mode();
871            logging_clock_set_static_time(1_650_000_000_000_000);
872
873            log::info!(
874                component = "RiskEngine";
875                "This is a test."
876            );
877
878            let mut log_contents = String::new();
879
880            wait_until(
881                || {
882                    std::fs::read_dir(&temp_dir)
883                        .expect("Failed to read directory")
884                        .filter_map(Result::ok)
885                        .any(|entry| entry.path().is_file())
886                },
887                Duration::from_secs(3),
888            );
889
890            drop(log_guard); // Ensure log buffers are flushed
891
892            wait_until(
893                || {
894                    let log_file_path = std::fs::read_dir(&temp_dir)
895                        .expect("Failed to read directory")
896                        .filter_map(Result::ok)
897                        .find(|entry| entry.path().is_file())
898                        .expect("No files found in directory")
899                        .path();
900                    dbg!(&log_file_path);
901                    log_contents = std::fs::read_to_string(log_file_path)
902                        .expect("Error while reading log file");
903                    !log_contents.is_empty()
904                },
905                Duration::from_secs(3),
906            );
907
908            assert_eq!(
909                log_contents,
910                "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
911            );
912        }
913
914        #[rstest]
915        fn test_shutdown_drains_backlog_tail() {
916            // Configure file logging at Info level
917            let config = LoggerConfig {
918                stdout_level: LevelFilter::Off,
919                fileout_level: LevelFilter::Info,
920                ..Default::default()
921            };
922
923            let temp_dir = tempdir().expect("Failed to create temporary directory");
924            let file_config = FileWriterConfig {
925                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
926                ..Default::default()
927            };
928
929            let log_guard = Logger::init_with_config(
930                TraderId::from("TRADER-TAIL"),
931                UUID4::new(),
932                config,
933                file_config,
934            )
935            .expect("Failed to initialize logger");
936
937            // Use static time for reproducibility
938            logging_clock_set_static_mode();
939            logging_clock_set_static_time(1_700_000_000_000_000);
940
941            // Enqueue a known number of messages synchronously
942            const N: usize = 1000;
943            for i in 0..N {
944                log::info!(component = "TailDrain"; "BacklogTest {i}");
945            }
946
947            // Drop guard to trigger shutdown (bypass + close + drain)
948            drop(log_guard);
949
950            // Wait until the file exists and contains at least N lines with our marker
951            let mut count = 0usize;
952            wait_until(
953                || {
954                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
955                        .expect("Failed to read directory")
956                        .filter_map(Result::ok)
957                        .find(|entry| entry.path().is_file())
958                    {
959                        let log_file_path = log_file.path();
960                        if let Ok(contents) = std::fs::read_to_string(log_file_path) {
961                            count = contents
962                                .lines()
963                                .filter(|l| l.contains("BacklogTest "))
964                                .count();
965                            count >= N
966                        } else {
967                            false
968                        }
969                    } else {
970                        false
971                    }
972                },
973                Duration::from_secs(5),
974            );
975
976            assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
977        }
978
979        #[rstest]
980        fn test_log_component_level_filtering() {
981            let config =
982                LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
983
984            let temp_dir = tempdir().expect("Failed to create temporary directory");
985            let file_config = FileWriterConfig {
986                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
987                ..Default::default()
988            };
989
990            let log_guard = Logger::init_with_config(
991                TraderId::from("TRADER-001"),
992                UUID4::new(),
993                config,
994                file_config,
995            );
996
997            logging_clock_set_static_mode();
998            logging_clock_set_static_time(1_650_000_000_000_000);
999
1000            log::info!(
1001                component = "RiskEngine";
1002                "This is a test."
1003            );
1004
1005            drop(log_guard); // Ensure log buffers are flushed
1006
1007            wait_until(
1008                || {
1009                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
1010                        .expect("Failed to read directory")
1011                        .filter_map(Result::ok)
1012                        .find(|entry| entry.path().is_file())
1013                    {
1014                        let log_file_path = log_file.path();
1015                        let log_contents = std::fs::read_to_string(log_file_path)
1016                            .expect("Error while reading log file");
1017                        !log_contents.contains("RiskEngine")
1018                    } else {
1019                        false
1020                    }
1021                },
1022                Duration::from_secs(3),
1023            );
1024
1025            assert!(
1026                std::fs::read_dir(&temp_dir)
1027                    .expect("Failed to read directory")
1028                    .filter_map(Result::ok)
1029                    .any(|entry| entry.path().is_file()),
1030                "Log file exists"
1031            );
1032        }
1033
1034        #[rstest]
1035        fn test_logging_to_file_in_json_format() {
1036            let config =
1037                LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1038                    .unwrap();
1039
1040            let temp_dir = tempdir().expect("Failed to create temporary directory");
1041            let file_config = FileWriterConfig {
1042                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1043                file_format: Some("json".to_string()),
1044                ..Default::default()
1045            };
1046
1047            let log_guard = Logger::init_with_config(
1048                TraderId::from("TRADER-001"),
1049                UUID4::new(),
1050                config,
1051                file_config,
1052            );
1053
1054            logging_clock_set_static_mode();
1055            logging_clock_set_static_time(1_650_000_000_000_000);
1056
1057            log::info!(
1058                component = "RiskEngine";
1059                "This is a test."
1060            );
1061
1062            let mut log_contents = String::new();
1063
1064            drop(log_guard); // Ensure log buffers are flushed
1065
1066            wait_until(
1067                || {
1068                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
1069                        .expect("Failed to read directory")
1070                        .filter_map(Result::ok)
1071                        .find(|entry| entry.path().is_file())
1072                    {
1073                        let log_file_path = log_file.path();
1074                        log_contents = std::fs::read_to_string(log_file_path)
1075                            .expect("Error while reading log file");
1076                        !log_contents.is_empty()
1077                    } else {
1078                        false
1079                    }
1080                },
1081                Duration::from_secs(3),
1082            );
1083
1084            assert_eq!(
1085                log_contents,
1086                "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1087            );
1088        }
1089
1090        #[ignore = "Flaky test: Passing locally on some systems, failing in CI"]
1091        #[rstest]
1092        fn test_file_rotation_and_backup_limits() {
1093            // Create a temporary directory for log files
1094            let temp_dir = tempdir().expect("Failed to create temporary directory");
1095            let dir_path = temp_dir.path().to_str().unwrap().to_string();
1096
1097            // Configure a small max file size to trigger rotation quickly
1098            let max_backups = 3;
1099            let max_file_size = 100;
1100            let file_config = FileWriterConfig {
1101                directory: Some(dir_path.clone()),
1102                file_name: None,
1103                file_format: Some("log".to_string()),
1104                file_rotate: Some((max_file_size, max_backups).into()), // 100 bytes max size, 3 max backups
1105            };
1106
1107            // Create the file writer
1108            let config = LoggerConfig::from_spec("fileout=Info;Test=Info").unwrap();
1109            let log_guard = Logger::init_with_config(
1110                TraderId::from("TRADER-001"),
1111                UUID4::new(),
1112                config,
1113                file_config,
1114            );
1115
1116            log::info!(
1117                component = "Test";
1118                "Test log message with enough content to exceed our small max file size limit"
1119            );
1120
1121            sleep(Duration::from_millis(100));
1122
1123            // Count the number of log files in the directory
1124            let files: Vec<_> = std::fs::read_dir(&dir_path)
1125                .expect("Failed to read directory")
1126                .filter_map(Result::ok)
1127                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1128                .collect();
1129
1130            // We should have multiple files due to rotation
1131            assert_eq!(files.len(), 1);
1132
1133            log::info!(
1134                component = "Test";
1135                "Test log message with enough content to exceed our small max file size limit"
1136            );
1137
1138            sleep(Duration::from_millis(100));
1139
1140            // Count the number of log files in the directory
1141            let files: Vec<_> = std::fs::read_dir(&dir_path)
1142                .expect("Failed to read directory")
1143                .filter_map(Result::ok)
1144                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1145                .collect();
1146
1147            // We should have multiple files due to rotation
1148            assert_eq!(files.len(), 2);
1149
1150            for _ in 0..5 {
1151                // Write enough data to trigger a few rotations
1152                log::info!(
1153                component = "Test";
1154                "Test log message with enough content to exceed our small max file size limit"
1155                );
1156
1157                sleep(Duration::from_millis(100));
1158            }
1159
1160            // Count the number of log files in the directory
1161            let files: Vec<_> = std::fs::read_dir(&dir_path)
1162                .expect("Failed to read directory")
1163                .filter_map(Result::ok)
1164                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1165                .collect();
1166
1167            // We should have at most max_backups + 1 files (current file + backups)
1168            assert!(
1169                files.len() == max_backups as usize + 1,
1170                "Expected at most {} log files, found {}",
1171                max_backups,
1172                files.len()
1173            );
1174
1175            // Clean up
1176            drop(log_guard);
1177            drop(temp_dir);
1178        }
1179    }
1180}