nautilus_common/logging/
logger.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    fmt::Display,
18    sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
19};
20
21use indexmap::IndexMap;
22use log::{
23    Level, LevelFilter, Log, STATIC_MAX_LEVEL,
24    kv::{ToValue, Value},
25    set_boxed_logger, set_max_level,
26};
27use nautilus_core::{
28    UUID4, UnixNanos,
29    datetime::unix_nanos_to_iso8601,
30    time::{get_atomic_clock_realtime, get_atomic_clock_static},
31};
32use nautilus_model::identifiers::TraderId;
33use serde::{Deserialize, Serialize, Serializer};
34use ustr::Ustr;
35
36pub use super::config::LoggerConfig;
37use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
38use crate::{
39    enums::{LogColor, LogLevel},
40    logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
41};
42
43const LOGGING: &str = "logging";
44const KV_COLOR: &str = "color";
45const KV_COMPONENT: &str = "component";
46
47/// Global log sender which allows multiple log guards per process.
48static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
49
50/// Global handle to the logging thread - only one thread exists per process.
51static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
52
53/// A high-performance logger utilizing a MPSC channel under the hood.
54///
55/// A logger is initialized with a [`LoggerConfig`] to set up different logging levels for
56/// stdout, file, and components. The logger spawns a thread that listens for [`LogEvent`]s
57/// sent via an MPSC channel.
58#[derive(Debug)]
59pub struct Logger {
60    /// Configuration for logging levels and behavior.
61    pub config: LoggerConfig,
62    /// Transmitter for sending log events to the 'logging' thread.
63    tx: std::sync::mpsc::Sender<LogEvent>,
64}
65
66/// Represents a type of log event.
67#[derive(Debug)]
68pub enum LogEvent {
69    /// A log line event.
70    Log(LogLine),
71    /// A command to flush all logger buffers.
72    Flush,
73    /// A command to close the logger.
74    Close,
75}
76
77/// Represents a log event which includes a message.
78#[derive(Clone, Debug, Serialize, Deserialize)]
79pub struct LogLine {
80    /// The timestamp for the event.
81    pub timestamp: UnixNanos,
82    /// The log level for the event.
83    pub level: Level,
84    /// The color for the log message content.
85    pub color: LogColor,
86    /// The Nautilus system component the log event originated from.
87    pub component: Ustr,
88    /// The log message content.
89    pub message: String,
90}
91
92impl Display for LogLine {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(f, "[{}] {}: {}", self.level, self.component, self.message)
95    }
96}
97
98/// A wrapper around a log line that provides formatted and cached representations.
99///
100/// This struct contains a log line and provides various formatted versions
101/// of it, such as plain string, colored string, and JSON. It also caches the
102/// results for repeated calls, optimizing performance when the same message
103/// needs to be logged multiple times in different formats.
104#[derive(Clone, Debug)]
105pub struct LogLineWrapper {
106    /// The underlying log line that contains the log data.
107    line: LogLine,
108    /// Cached plain string representation of the log line.
109    cache: Option<String>,
110    /// Cached colored string representation of the log line.
111    colored: Option<String>,
112    /// The ID of the trader associated with this log event.
113    trader_id: Ustr,
114}
115
116impl LogLineWrapper {
117    /// Creates a new [`LogLineWrapper`] instance.
118    #[must_use]
119    pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
120        Self {
121            line,
122            cache: None,
123            colored: None,
124            trader_id,
125        }
126    }
127
128    /// Returns the plain log message string, caching the result.
129    ///
130    /// This method constructs the log line format and caches it for repeated calls. Useful when the
131    /// same log message needs to be printed multiple times.
132    pub fn get_string(&mut self) -> &str {
133        self.cache.get_or_insert_with(|| {
134            format!(
135                "{} [{}] {}.{}: {}\n",
136                unix_nanos_to_iso8601(self.line.timestamp),
137                self.line.level,
138                self.trader_id,
139                &self.line.component,
140                &self.line.message,
141            )
142        })
143    }
144
145    /// Returns the colored log message string, caching the result.
146    ///
147    /// This method constructs the colored log line format and caches the result
148    /// for repeated calls, providing the message with ANSI color codes if the
149    /// logger is configured to use colors.
150    pub fn get_colored(&mut self) -> &str {
151        self.colored.get_or_insert_with(|| {
152            format!(
153                "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
154                unix_nanos_to_iso8601(self.line.timestamp),
155                &self.line.color.as_ansi(),
156                self.line.level,
157                self.trader_id,
158                &self.line.component,
159                &self.line.message,
160            )
161        })
162    }
163
164    /// Returns the log message as a JSON string.
165    ///
166    /// This method serializes the log line and its associated metadata
167    /// (timestamp, trader ID, etc.) into a JSON string format. This is useful
168    /// for structured logging or when logs need to be stored in a JSON format.
169    /// # Panics
170    ///
171    /// Panics if serialization of the log event to JSON fails.
172    #[must_use]
173    pub fn get_json(&self) -> String {
174        let json_string =
175            serde_json::to_string(&self).expect("Error serializing log event to string");
176        format!("{json_string}\n")
177    }
178}
179
180impl Serialize for LogLineWrapper {
181    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182    where
183        S: Serializer,
184    {
185        let mut json_obj = IndexMap::new();
186        let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
187        json_obj.insert("timestamp".to_string(), timestamp);
188        json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
189        json_obj.insert("level".to_string(), self.line.level.to_string());
190        json_obj.insert("color".to_string(), self.line.color.to_string());
191        json_obj.insert("component".to_string(), self.line.component.to_string());
192        json_obj.insert("message".to_string(), self.line.message.clone());
193
194        json_obj.serialize(serializer)
195    }
196}
197
198impl Log for Logger {
199    fn enabled(&self, metadata: &log::Metadata) -> bool {
200        !LOGGING_BYPASSED.load(Ordering::Relaxed)
201            && (metadata.level() == Level::Error
202                || metadata.level() <= self.config.stdout_level
203                || metadata.level() <= self.config.fileout_level)
204    }
205
206    fn log(&self, record: &log::Record) {
207        if self.enabled(record.metadata()) {
208            let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
209                get_atomic_clock_realtime().get_time_ns()
210            } else {
211                get_atomic_clock_static().get_time_ns()
212            };
213            let level = record.level();
214            let key_values = record.key_values();
215            let color: LogColor = key_values
216                .get(KV_COLOR.into())
217                .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
218                .unwrap_or(level.into());
219            let component = key_values.get(KV_COMPONENT.into()).map_or_else(
220                || Ustr::from(record.metadata().target()),
221                |v| Ustr::from(&v.to_string()),
222            );
223
224            let line = LogLine {
225                timestamp,
226                level,
227                color,
228                component,
229                message: format!("{}", record.args()),
230            };
231            if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
232                eprintln!("Error sending log event (receiver closed): {line}");
233            }
234        }
235    }
236
237    fn flush(&self) {
238        // Don't attempt to flush if we're already bypassed/shutdown
239        if LOGGING_BYPASSED.load(Ordering::Relaxed) {
240            return;
241        }
242
243        if let Err(e) = self.tx.send(LogEvent::Flush) {
244            eprintln!("Error sending flush log event: {e}");
245        }
246    }
247}
248
249#[allow(clippy::too_many_arguments)]
250impl Logger {
251    /// Initializes the logger based on the `NAUTILUS_LOG` environment variable.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if reading the environment variable or parsing the configuration fails.
256    pub fn init_with_env(
257        trader_id: TraderId,
258        instance_id: UUID4,
259        file_config: FileWriterConfig,
260    ) -> anyhow::Result<LogGuard> {
261        let config = LoggerConfig::from_env()?;
262        Self::init_with_config(trader_id, instance_id, config, file_config)
263    }
264
265    /// Initializes the logger with the given configuration.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if the logger fails to register or initialize the background thread.
270    pub fn init_with_config(
271        trader_id: TraderId,
272        instance_id: UUID4,
273        config: LoggerConfig,
274        file_config: FileWriterConfig,
275    ) -> anyhow::Result<LogGuard> {
276        let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
277
278        let logger_tx = tx.clone();
279        let logger = Self {
280            tx: logger_tx,
281            config: config.clone(),
282        };
283
284        set_boxed_logger(Box::new(logger))?;
285
286        // Store the sender globally so additional guards can be created
287        if LOGGER_TX.set(tx).is_err() {
288            debug_assert!(
289                false,
290                "LOGGER_TX already set - re-initialization not supported"
291            );
292        }
293
294        let is_colored = config.is_colored;
295
296        let print_config = config.print_config;
297        if print_config {
298            println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
299            println!("Logger initialized with {config:?} {file_config:?}");
300        }
301
302        let handle = std::thread::Builder::new()
303            .name(LOGGING.to_string())
304            .spawn(move || {
305                Self::handle_messages(
306                    trader_id.to_string(),
307                    instance_id.to_string(),
308                    config,
309                    file_config,
310                    rx,
311                );
312            })?;
313
314        // Store the handle globally
315        if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
316            debug_assert!(
317                handle_guard.is_none(),
318                "LOGGER_HANDLE already set - re-initialization not supported"
319            );
320            *handle_guard = Some(handle);
321        }
322
323        let max_level = log::LevelFilter::Trace;
324        set_max_level(max_level);
325
326        if print_config {
327            println!("Logger set as `log` implementation with max level {max_level}");
328        }
329
330        super::LOGGING_INITIALIZED.store(true, Ordering::SeqCst);
331        super::LOGGING_COLORED.store(is_colored, Ordering::SeqCst);
332
333        LogGuard::new()
334            .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
335    }
336
337    fn handle_messages(
338        trader_id: String,
339        instance_id: String,
340        config: LoggerConfig,
341        file_config: FileWriterConfig,
342        rx: std::sync::mpsc::Receiver<LogEvent>,
343    ) {
344        let LoggerConfig {
345            stdout_level,
346            fileout_level,
347            component_level,
348            log_components_only,
349            is_colored,
350            print_config: _,
351        } = config;
352
353        let trader_id_cache = Ustr::from(&trader_id);
354
355        // Set up std I/O buffers
356        let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
357        let mut stderr_writer = StderrWriter::new(is_colored);
358
359        // Conditionally create file writer based on fileout_level
360        let mut file_writer_opt = if fileout_level == LevelFilter::Off {
361            None
362        } else {
363            FileWriter::new(trader_id, instance_id, file_config, fileout_level)
364        };
365
366        let process_event = |event: LogEvent,
367                             stdout_writer: &mut StdoutWriter,
368                             stderr_writer: &mut StderrWriter,
369                             file_writer_opt: &mut Option<FileWriter>| {
370            match event {
371                LogEvent::Log(line) => {
372                    let component_filter_level = component_level.get(&line.component);
373
374                    if log_components_only && component_filter_level.is_none() {
375                        return;
376                    }
377
378                    if let Some(&filter_level) = component_filter_level
379                        && line.level > filter_level
380                    {
381                        return;
382                    }
383
384                    let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
385
386                    if stderr_writer.enabled(&wrapper.line) {
387                        if is_colored {
388                            stderr_writer.write(wrapper.get_colored());
389                        } else {
390                            stderr_writer.write(wrapper.get_string());
391                        }
392                    }
393
394                    if stdout_writer.enabled(&wrapper.line) {
395                        if is_colored {
396                            stdout_writer.write(wrapper.get_colored());
397                        } else {
398                            stdout_writer.write(wrapper.get_string());
399                        }
400                    }
401
402                    if let Some(file_writer) = file_writer_opt
403                        && file_writer.enabled(&wrapper.line)
404                    {
405                        if file_writer.json_format {
406                            file_writer.write(&wrapper.get_json());
407                        } else {
408                            file_writer.write(wrapper.get_string());
409                        }
410                    }
411                }
412                LogEvent::Flush => {
413                    stdout_writer.flush();
414                    stderr_writer.flush();
415
416                    if let Some(file_writer) = file_writer_opt {
417                        file_writer.flush();
418                    }
419                }
420                LogEvent::Close => {
421                    // Close handled in the main loop; ignore here.
422                }
423            }
424        };
425
426        // Continue to receive and handle log events until channel is hung up
427        while let Ok(event) = rx.recv() {
428            match event {
429                LogEvent::Log(_) | LogEvent::Flush => process_event(
430                    event,
431                    &mut stdout_writer,
432                    &mut stderr_writer,
433                    &mut file_writer_opt,
434                ),
435                LogEvent::Close => {
436                    // First flush what's been written so far
437                    stdout_writer.flush();
438                    stderr_writer.flush();
439
440                    if let Some(ref mut file_writer) = file_writer_opt {
441                        file_writer.flush();
442                    }
443
444                    // Drain any remaining events that may have raced with shutdown
445                    // This ensures logs enqueued just before/around shutdown aren't lost.
446                    while let Ok(evt) = rx.try_recv() {
447                        match evt {
448                            LogEvent::Close => (), // ignore extra Close events
449                            _ => process_event(
450                                evt,
451                                &mut stdout_writer,
452                                &mut stderr_writer,
453                                &mut file_writer_opt,
454                            ),
455                        }
456                    }
457
458                    // Final flush after draining
459                    stdout_writer.flush();
460                    stderr_writer.flush();
461
462                    if let Some(ref mut file_writer) = file_writer_opt {
463                        file_writer.flush();
464                    }
465
466                    break;
467                }
468            }
469        }
470    }
471}
472
473/// Gracefully shuts down the logging subsystem.
474///
475/// Performs the same shutdown sequence as dropping the last `LogGuard`, but can be called
476/// explicitly for deterministic shutdown timing (e.g., testing or Windows Python applications).
477///
478/// # Safety
479///
480/// Safe to call multiple times. Thread join is skipped if called from the logging thread.
481pub(crate) fn shutdown_graceful() {
482    // Prevent further logging
483    LOGGING_BYPASSED.store(true, Ordering::SeqCst);
484    log::set_max_level(log::LevelFilter::Off);
485
486    // Signal Close if the sender exists
487    if let Some(tx) = LOGGER_TX.get() {
488        let _ = tx.send(LogEvent::Close);
489    }
490
491    if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
492        && let Some(handle) = handle_guard.take()
493        && handle.thread().id() != std::thread::current().id()
494    {
495        let _ = handle.join();
496    }
497
498    LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
499}
500
501pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
502    let color = Value::from(color as u8);
503
504    match level {
505        LogLevel::Off => {}
506        LogLevel::Trace => {
507            log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
508        }
509        LogLevel::Debug => {
510            log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
511        }
512        LogLevel::Info => {
513            log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
514        }
515        LogLevel::Warning => {
516            log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
517        }
518        LogLevel::Error => {
519            log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
520        }
521    }
522}
523
524/// A guard that manages the lifecycle of the logging subsystem.
525///
526/// `LogGuard` ensures the logging thread remains active while instances exist and properly
527/// terminates when all guards are dropped. The system uses reference counting to track active
528/// guards - when the last `LogGuard` is dropped, the logging thread is joined to ensure all
529/// pending log messages are written before the process terminates.
530///
531/// # Reference Counting
532///
533/// The logging system maintains a global atomic counter of active `LogGuard` instances. This
534/// ensures that:
535/// - The logging thread remains active as long as at least one `LogGuard` exists.
536/// - All log messages are properly flushed when intermediate guards are dropped.
537/// - The logging thread is cleanly terminated and joined when the last guard is dropped.
538///
539/// # Shutdown Behavior
540///
541/// When the last guard is dropped, the logging thread is signaled to close, drains pending
542/// messages, and is joined to ensure all logs are written before process termination.
543///
544/// **Python on Windows:** Non-deterministic GC order during interpreter shutdown can
545/// occasionally prevent proper thread join, resulting in truncated logs.
546///
547/// # Limits
548///
549/// The system supports a maximum of 255 concurrent `LogGuard` instances.
550#[cfg_attr(
551    feature = "python",
552    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
553)]
554#[derive(Debug)]
555pub struct LogGuard {
556    tx: std::sync::mpsc::Sender<LogEvent>,
557}
558
559impl LogGuard {
560    /// Creates a new [`LogGuard`] instance from the global logger.
561    ///
562    /// Returns `None` if logging has not been initialized.
563    ///
564    /// # Panics
565    ///
566    /// Panics if the number of active LogGuards would exceed 255.
567    #[must_use]
568    pub fn new() -> Option<Self> {
569        LOGGER_TX.get().map(|tx| {
570            LOGGING_GUARDS_ACTIVE
571                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
572                    if count == u8::MAX {
573                        None // Reject the update if we're at the limit
574                    } else {
575                        Some(count + 1)
576                    }
577                })
578                .expect("Maximum number of active LogGuards (255) exceeded");
579
580            Self { tx: tx.clone() }
581        })
582    }
583}
584
585impl Drop for LogGuard {
586    /// Handles cleanup when a `LogGuard` is dropped.
587    ///
588    /// Sends `Flush` if other guards remain active, otherwise sends `Close`, joins the
589    /// logging thread, and resets the subsystem state.
590    fn drop(&mut self) {
591        let previous_count = LOGGING_GUARDS_ACTIVE
592            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
593                if count == 0 {
594                    panic!("LogGuard reference count underflow");
595                }
596                Some(count - 1)
597            })
598            .expect("Failed to decrement LogGuard count");
599
600        // Check if this was the last LogGuard - re-check after decrement to avoid race
601        if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
602            // This is truly the last LogGuard, so we should close the logger and join the thread
603            // to ensure all log messages are written before the process terminates.
604            // Prevent any new log events from being accepted while shutting down.
605            LOGGING_BYPASSED.store(true, Ordering::SeqCst);
606
607            // Disable all log levels to reduce overhead on late calls
608            log::set_max_level(log::LevelFilter::Off);
609
610            // Ensure Close is delivered before joining (critical for shutdown)
611            let _ = self.tx.send(LogEvent::Close);
612
613            // Join the logging thread to ensure all pending logs are written
614            if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
615                && let Some(handle) = handle_guard.take()
616            {
617                // Avoid self-join deadlock
618                if handle.thread().id() != std::thread::current().id() {
619                    let _ = handle.join();
620                }
621            }
622
623            // Reset LOGGING_INITIALIZED since the logging thread has terminated
624            LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
625        } else {
626            // Other LogGuards are still active, just flush our logs
627            let _ = self.tx.send(LogEvent::Flush);
628        }
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use std::time::Duration;
635
636    use ahash::AHashMap;
637    use log::LevelFilter;
638    use nautilus_core::UUID4;
639    use nautilus_model::identifiers::TraderId;
640    use rstest::*;
641    use serde_json::Value;
642    use tempfile::tempdir;
643    use ustr::Ustr;
644
645    use super::*;
646    use crate::{
647        enums::LogColor,
648        logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
649        testing::wait_until,
650    };
651
652    #[rstest]
653    fn log_message_serialization() {
654        let log_message = LogLine {
655            timestamp: UnixNanos::default(),
656            level: log::Level::Info,
657            color: LogColor::Normal,
658            component: Ustr::from("Portfolio"),
659            message: "This is a log message".to_string(),
660        };
661
662        let serialized_json = serde_json::to_string(&log_message).unwrap();
663        let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
664
665        assert_eq!(deserialized_value["level"], "INFO");
666        assert_eq!(deserialized_value["component"], "Portfolio");
667        assert_eq!(deserialized_value["message"], "This is a log message");
668    }
669
670    #[rstest]
671    fn log_config_parsing() {
672        let config =
673            LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
674                .unwrap();
675        assert_eq!(
676            config,
677            LoggerConfig {
678                stdout_level: LevelFilter::Info,
679                fileout_level: LevelFilter::Debug,
680                component_level: AHashMap::from_iter(vec![(
681                    Ustr::from("RiskEngine"),
682                    LevelFilter::Error
683                )]),
684                log_components_only: false,
685                is_colored: true,
686                print_config: false,
687            }
688        );
689    }
690
691    #[rstest]
692    fn log_config_parsing2() {
693        let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
694        assert_eq!(
695            config,
696            LoggerConfig {
697                stdout_level: LevelFilter::Warn,
698                fileout_level: LevelFilter::Error,
699                component_level: AHashMap::new(),
700                log_components_only: false,
701                is_colored: true,
702                print_config: true,
703            }
704        );
705    }
706
707    #[rstest]
708    fn log_config_parsing_with_log_components_only() {
709        let config =
710            LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
711        assert_eq!(
712            config,
713            LoggerConfig {
714                stdout_level: LevelFilter::Info,
715                fileout_level: LevelFilter::Off,
716                component_level: AHashMap::from_iter(vec![(
717                    Ustr::from("RiskEngine"),
718                    LevelFilter::Debug
719                )]),
720                log_components_only: true,
721                is_colored: true,
722                print_config: false,
723            }
724        );
725    }
726
727    #[rstest]
728    fn test_log_line_wrapper_plain_string() {
729        let line = LogLine {
730            timestamp: 1_650_000_000_000_000_000.into(),
731            level: log::Level::Info,
732            color: LogColor::Normal,
733            component: Ustr::from("TestComponent"),
734            message: "Test message".to_string(),
735        };
736
737        let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
738        let result = wrapper.get_string();
739
740        assert!(result.contains("TRADER-001"));
741        assert!(result.contains("TestComponent"));
742        assert!(result.contains("Test message"));
743        assert!(result.contains("[INFO]"));
744        assert!(result.ends_with('\n'));
745        // Should NOT contain ANSI codes
746        assert!(!result.contains("\x1b["));
747    }
748
749    #[rstest]
750    fn test_log_line_wrapper_colored_string() {
751        let line = LogLine {
752            timestamp: 1_650_000_000_000_000_000.into(),
753            level: log::Level::Info,
754            color: LogColor::Green,
755            component: Ustr::from("TestComponent"),
756            message: "Test message".to_string(),
757        };
758
759        let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
760        let result = wrapper.get_colored();
761
762        assert!(result.contains("TRADER-001"));
763        assert!(result.contains("TestComponent"));
764        assert!(result.contains("Test message"));
765        // Should contain ANSI codes
766        assert!(result.contains("\x1b["));
767        assert!(result.ends_with('\n'));
768    }
769
770    #[rstest]
771    fn test_log_line_wrapper_json_output() {
772        let line = LogLine {
773            timestamp: 1_650_000_000_000_000_000.into(),
774            level: log::Level::Warn,
775            color: LogColor::Yellow,
776            component: Ustr::from("RiskEngine"),
777            message: "Warning message".to_string(),
778        };
779
780        let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-002"));
781        let json = wrapper.get_json();
782
783        let parsed: Value = serde_json::from_str(json.trim()).unwrap();
784        assert_eq!(parsed["trader_id"], "TRADER-002");
785        assert_eq!(parsed["component"], "RiskEngine");
786        assert_eq!(parsed["message"], "Warning message");
787        assert_eq!(parsed["level"], "WARN");
788        assert_eq!(parsed["color"], "YELLOW");
789    }
790
791    #[rstest]
792    fn test_log_line_wrapper_caches_string() {
793        let line = LogLine {
794            timestamp: 1_650_000_000_000_000_000.into(),
795            level: log::Level::Info,
796            color: LogColor::Normal,
797            component: Ustr::from("Test"),
798            message: "Cached".to_string(),
799        };
800
801        let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER"));
802        let first = wrapper.get_string().to_string();
803        let second = wrapper.get_string().to_string();
804
805        assert_eq!(first, second);
806    }
807
808    #[rstest]
809    fn test_log_line_display() {
810        let line = LogLine {
811            timestamp: 0.into(),
812            level: log::Level::Error,
813            color: LogColor::Red,
814            component: Ustr::from("Component"),
815            message: "Error occurred".to_string(),
816        };
817
818        let display = format!("{line}");
819        assert_eq!(display, "[ERROR] Component: Error occurred");
820    }
821
822    // These tests use global logging state (one logger per process).
823    // They run correctly with cargo-nextest which isolates each test in its own process.
824    mod serial_tests {
825        use std::sync::atomic::Ordering;
826
827        use super::*;
828        use crate::logging::{LOGGING_BYPASSED, logging_is_initialized, logging_set_bypass};
829
830        #[rstest]
831        fn test_logging_to_file() {
832            let config = LoggerConfig {
833                fileout_level: LevelFilter::Debug,
834                ..Default::default()
835            };
836
837            let temp_dir = tempdir().expect("Failed to create temporary directory");
838            let file_config = FileWriterConfig {
839                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
840                ..Default::default()
841            };
842
843            let log_guard = Logger::init_with_config(
844                TraderId::from("TRADER-001"),
845                UUID4::new(),
846                config,
847                file_config,
848            );
849
850            logging_clock_set_static_mode();
851            logging_clock_set_static_time(1_650_000_000_000_000);
852
853            log::info!(
854                component = "RiskEngine";
855                "This is a test."
856            );
857
858            let mut log_contents = String::new();
859
860            wait_until(
861                || {
862                    std::fs::read_dir(&temp_dir)
863                        .expect("Failed to read directory")
864                        .filter_map(Result::ok)
865                        .any(|entry| entry.path().is_file())
866                },
867                Duration::from_secs(3),
868            );
869
870            drop(log_guard); // Ensure log buffers are flushed
871
872            wait_until(
873                || {
874                    let log_file_path = std::fs::read_dir(&temp_dir)
875                        .expect("Failed to read directory")
876                        .filter_map(Result::ok)
877                        .find(|entry| entry.path().is_file())
878                        .expect("No files found in directory")
879                        .path();
880                    log_contents = std::fs::read_to_string(log_file_path)
881                        .expect("Error while reading log file");
882                    !log_contents.is_empty()
883                },
884                Duration::from_secs(3),
885            );
886
887            assert_eq!(
888                log_contents,
889                "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
890            );
891        }
892
893        #[rstest]
894        fn test_shutdown_drains_backlog_tail() {
895            // Configure file logging at Info level
896            let config = LoggerConfig {
897                stdout_level: LevelFilter::Off,
898                fileout_level: LevelFilter::Info,
899                ..Default::default()
900            };
901
902            let temp_dir = tempdir().expect("Failed to create temporary directory");
903            let file_config = FileWriterConfig {
904                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
905                ..Default::default()
906            };
907
908            let log_guard = Logger::init_with_config(
909                TraderId::from("TRADER-TAIL"),
910                UUID4::new(),
911                config,
912                file_config,
913            )
914            .expect("Failed to initialize logger");
915
916            // Use static time for reproducibility
917            logging_clock_set_static_mode();
918            logging_clock_set_static_time(1_700_000_000_000_000);
919
920            // Enqueue a known number of messages synchronously
921            const N: usize = 1000;
922            for i in 0..N {
923                log::info!(component = "TailDrain"; "BacklogTest {i}");
924            }
925
926            // Drop guard to trigger shutdown (bypass + close + drain)
927            drop(log_guard);
928
929            // Wait until the file exists and contains at least N lines with our marker
930            let mut count = 0usize;
931            wait_until(
932                || {
933                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
934                        .expect("Failed to read directory")
935                        .filter_map(Result::ok)
936                        .find(|entry| entry.path().is_file())
937                    {
938                        let log_file_path = log_file.path();
939                        if let Ok(contents) = std::fs::read_to_string(log_file_path) {
940                            count = contents
941                                .lines()
942                                .filter(|l| l.contains("BacklogTest "))
943                                .count();
944                            count >= N
945                        } else {
946                            false
947                        }
948                    } else {
949                        false
950                    }
951                },
952                Duration::from_secs(5),
953            );
954
955            assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
956        }
957
958        #[rstest]
959        fn test_log_component_level_filtering() {
960            let config =
961                LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
962
963            let temp_dir = tempdir().expect("Failed to create temporary directory");
964            let file_config = FileWriterConfig {
965                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
966                ..Default::default()
967            };
968
969            let log_guard = Logger::init_with_config(
970                TraderId::from("TRADER-001"),
971                UUID4::new(),
972                config,
973                file_config,
974            );
975
976            logging_clock_set_static_mode();
977            logging_clock_set_static_time(1_650_000_000_000_000);
978
979            log::info!(
980                component = "RiskEngine";
981                "This is a test."
982            );
983
984            drop(log_guard); // Ensure log buffers are flushed
985
986            wait_until(
987                || {
988                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
989                        .expect("Failed to read directory")
990                        .filter_map(Result::ok)
991                        .find(|entry| entry.path().is_file())
992                    {
993                        let log_file_path = log_file.path();
994                        let log_contents = std::fs::read_to_string(log_file_path)
995                            .expect("Error while reading log file");
996                        !log_contents.contains("RiskEngine")
997                    } else {
998                        false
999                    }
1000                },
1001                Duration::from_secs(3),
1002            );
1003
1004            assert!(
1005                std::fs::read_dir(&temp_dir)
1006                    .expect("Failed to read directory")
1007                    .filter_map(Result::ok)
1008                    .any(|entry| entry.path().is_file()),
1009                "Log file exists"
1010            );
1011        }
1012
1013        #[rstest]
1014        fn test_logging_to_file_in_json_format() {
1015            let config =
1016                LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1017                    .unwrap();
1018
1019            let temp_dir = tempdir().expect("Failed to create temporary directory");
1020            let file_config = FileWriterConfig {
1021                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1022                file_format: Some("json".to_string()),
1023                ..Default::default()
1024            };
1025
1026            let log_guard = Logger::init_with_config(
1027                TraderId::from("TRADER-001"),
1028                UUID4::new(),
1029                config,
1030                file_config,
1031            );
1032
1033            logging_clock_set_static_mode();
1034            logging_clock_set_static_time(1_650_000_000_000_000);
1035
1036            log::info!(
1037                component = "RiskEngine";
1038                "This is a test."
1039            );
1040
1041            let mut log_contents = String::new();
1042
1043            drop(log_guard); // Ensure log buffers are flushed
1044
1045            wait_until(
1046                || {
1047                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
1048                        .expect("Failed to read directory")
1049                        .filter_map(Result::ok)
1050                        .find(|entry| entry.path().is_file())
1051                    {
1052                        let log_file_path = log_file.path();
1053                        log_contents = std::fs::read_to_string(log_file_path)
1054                            .expect("Error while reading log file");
1055                        !log_contents.is_empty()
1056                    } else {
1057                        false
1058                    }
1059                },
1060                Duration::from_secs(3),
1061            );
1062
1063            assert_eq!(
1064                log_contents,
1065                "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1066            );
1067        }
1068
1069        #[rstest]
1070        fn test_init_sets_logging_is_initialized_flag() {
1071            let config = LoggerConfig::default();
1072            let file_config = FileWriterConfig::default();
1073
1074            let guard = Logger::init_with_config(
1075                TraderId::from("TRADER-001"),
1076                UUID4::new(),
1077                config,
1078                file_config,
1079            );
1080            assert!(guard.is_ok());
1081            assert!(logging_is_initialized());
1082
1083            drop(guard);
1084            assert!(!logging_is_initialized());
1085        }
1086
1087        #[rstest]
1088        fn test_reinit_after_guard_drop_fails() {
1089            let config = LoggerConfig::default();
1090            let file_config = FileWriterConfig::default();
1091
1092            let guard1 = Logger::init_with_config(
1093                TraderId::from("TRADER-001"),
1094                UUID4::new(),
1095                config.clone(),
1096                file_config.clone(),
1097            );
1098            assert!(guard1.is_ok());
1099            drop(guard1);
1100
1101            // Re-init fails because log crate's set_boxed_logger only works once per process
1102            let guard2 = Logger::init_with_config(
1103                TraderId::from("TRADER-002"),
1104                UUID4::new(),
1105                config,
1106                file_config,
1107            );
1108            assert!(guard2.is_err());
1109        }
1110
1111        #[rstest]
1112        fn test_bypass_before_init_prevents_logging() {
1113            logging_set_bypass();
1114            assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1115
1116            let temp_dir = tempdir().expect("Failed to create temporary directory");
1117            let config = LoggerConfig {
1118                fileout_level: LevelFilter::Debug,
1119                ..Default::default()
1120            };
1121            let file_config = FileWriterConfig {
1122                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1123                ..Default::default()
1124            };
1125
1126            let guard = Logger::init_with_config(
1127                TraderId::from("TRADER-001"),
1128                UUID4::new(),
1129                config,
1130                file_config,
1131            );
1132            assert!(guard.is_ok());
1133
1134            log::info!(
1135                component = "TestComponent";
1136                "This should be bypassed"
1137            );
1138            std::thread::sleep(Duration::from_millis(100));
1139            drop(guard);
1140
1141            // Bypass flag remains permanently set (no reset mechanism)
1142            assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1143        }
1144    }
1145}