nautilus_common/logging/
logger.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    env,
19    fmt::Display,
20    str::FromStr,
21    sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26    Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27    kv::{ToValue, Value},
28    set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31    UUID4, UnixNanos,
32    datetime::unix_nanos_to_iso8601,
33    time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
40use crate::{
41    enums::{LogColor, LogLevel},
42    logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46const KV_COLOR: &str = "color";
47const KV_COMPONENT: &str = "component";
48
49/// Global log sender which allows multiple log guards per process.
50static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
51
52/// Global handle to the logging thread - only one thread exists per process.
53static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
54
55#[cfg_attr(
56    feature = "python",
57    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
58)]
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct LoggerConfig {
61    /// Maximum log level to write to stdout.
62    pub stdout_level: LevelFilter,
63    /// Maximum log level to write to file (disabled is `Off`).
64    pub fileout_level: LevelFilter,
65    /// Per-component log levels, allowing finer-grained control.
66    component_level: HashMap<Ustr, LevelFilter>,
67    /// If logger is using ANSI color codes.
68    pub is_colored: bool,
69    /// If the configuration should be printed to stdout at initialization.
70    pub print_config: bool,
71}
72
73impl Default for LoggerConfig {
74    /// Creates a new default [`LoggerConfig`] instance.
75    fn default() -> Self {
76        Self {
77            stdout_level: LevelFilter::Info,
78            fileout_level: LevelFilter::Off,
79            component_level: HashMap::new(),
80            is_colored: true,
81            print_config: false,
82        }
83    }
84}
85
86impl LoggerConfig {
87    /// Creates a new [`LoggerConfig`] instance.
88    #[must_use]
89    pub const fn new(
90        stdout_level: LevelFilter,
91        fileout_level: LevelFilter,
92        component_level: HashMap<Ustr, LevelFilter>,
93        is_colored: bool,
94        print_config: bool,
95    ) -> Self {
96        Self {
97            stdout_level,
98            fileout_level,
99            component_level,
100            is_colored,
101            print_config,
102        }
103    }
104
105    /// # Errors
106    ///
107    /// Returns an error if the spec string is invalid.
108    pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
109        let mut config = Self::default();
110        for kv in spec.split(';') {
111            let kv = kv.trim();
112            if kv.is_empty() {
113                continue;
114            }
115            let kv_lower = kv.to_lowercase(); // For case-insensitive comparison
116            if kv_lower == "is_colored" {
117                config.is_colored = true;
118            } else if kv_lower == "print_config" {
119                config.print_config = true;
120            } else {
121                let parts: Vec<&str> = kv.split('=').collect();
122                if parts.len() != 2 {
123                    anyhow::bail!("Invalid spec pair: {}", kv);
124                }
125                let k = parts[0].trim(); // Trim key
126                let v = parts[1].trim(); // Trim value
127                let lvl = LevelFilter::from_str(v)
128                    .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
129                let k_lower = k.to_lowercase(); // Case-insensitive key matching
130                match k_lower.as_str() {
131                    "stdout" => config.stdout_level = lvl,
132                    "fileout" => config.fileout_level = lvl,
133                    _ => {
134                        config.component_level.insert(Ustr::from(k), lvl);
135                    }
136                }
137            }
138        }
139        Ok(config)
140    }
141
142    /// Retrieves the logger configuration from the "`NAUTILUS_LOG`" environment variable.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the variable is unset or invalid.
147    pub fn from_env() -> anyhow::Result<Self> {
148        let spec = env::var("NAUTILUS_LOG")?;
149        Self::from_spec(&spec)
150    }
151}
152
153/// A high-performance logger utilizing a MPSC channel under the hood.
154///
155/// A logger is initialized with a [`LoggerConfig`] to set up different logging levels for
156/// stdout, file, and components. The logger spawns a thread that listens for [`LogEvent`]s
157/// sent via an MPSC channel.
158#[derive(Debug)]
159pub struct Logger {
160    /// Configuration for logging levels and behavior.
161    pub config: LoggerConfig,
162    /// Transmitter for sending log events to the 'logging' thread.
163    tx: std::sync::mpsc::Sender<LogEvent>,
164}
165
166/// Represents a type of log event.
167#[derive(Debug)]
168pub enum LogEvent {
169    /// A log line event.
170    Log(LogLine),
171    /// A command to flush all logger buffers.
172    Flush,
173    /// A command to close the logger.
174    Close,
175}
176
177/// Represents a log event which includes a message.
178#[derive(Clone, Debug, Serialize, Deserialize)]
179pub struct LogLine {
180    /// The timestamp for the event.
181    pub timestamp: UnixNanos,
182    /// The log level for the event.
183    pub level: Level,
184    /// The color for the log message content.
185    pub color: LogColor,
186    /// The Nautilus system component the log event originated from.
187    pub component: Ustr,
188    /// The log message content.
189    pub message: String,
190}
191
192impl Display for LogLine {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        write!(f, "[{}] {}: {}", self.level, self.component, self.message)
195    }
196}
197
198/// A wrapper around a log line that provides formatted and cached representations.
199///
200/// This struct contains a log line and provides various formatted versions
201/// of it, such as plain string, colored string, and JSON. It also caches the
202/// results for repeated calls, optimizing performance when the same message
203/// needs to be logged multiple times in different formats.
204#[derive(Clone, Debug)]
205pub struct LogLineWrapper {
206    /// The underlying log line that contains the log data.
207    line: LogLine,
208    /// Cached plain string representation of the log line.
209    cache: Option<String>,
210    /// Cached colored string representation of the log line.
211    colored: Option<String>,
212    /// The ID of the trader associated with this log event.
213    trader_id: Ustr,
214}
215
216impl LogLineWrapper {
217    /// Creates a new [`LogLineWrapper`] instance.
218    #[must_use]
219    pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
220        Self {
221            line,
222            cache: None,
223            colored: None,
224            trader_id,
225        }
226    }
227
228    /// Returns the plain log message string, caching the result.
229    ///
230    /// This method constructs the log line format and caches it for repeated calls. Useful when the
231    /// same log message needs to be printed multiple times.
232    pub fn get_string(&mut self) -> &str {
233        self.cache.get_or_insert_with(|| {
234            format!(
235                "{} [{}] {}.{}: {}\n",
236                unix_nanos_to_iso8601(self.line.timestamp),
237                self.line.level,
238                self.trader_id,
239                &self.line.component,
240                &self.line.message,
241            )
242        })
243    }
244
245    /// Returns the colored log message string, caching the result.
246    ///
247    /// This method constructs the colored log line format and caches the result
248    /// for repeated calls, providing the message with ANSI color codes if the
249    /// logger is configured to use colors.
250    pub fn get_colored(&mut self) -> &str {
251        self.colored.get_or_insert_with(|| {
252            format!(
253                "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
254                unix_nanos_to_iso8601(self.line.timestamp),
255                &self.line.color.as_ansi(),
256                self.line.level,
257                self.trader_id,
258                &self.line.component,
259                &self.line.message,
260            )
261        })
262    }
263
264    /// Returns the log message as a JSON string.
265    ///
266    /// This method serializes the log line and its associated metadata
267    /// (timestamp, trader ID, etc.) into a JSON string format. This is useful
268    /// for structured logging or when logs need to be stored in a JSON format.
269    /// # Panics
270    ///
271    /// Panics if serialization of the log event to JSON fails.
272    #[must_use]
273    pub fn get_json(&self) -> String {
274        let json_string =
275            serde_json::to_string(&self).expect("Error serializing log event to string");
276        format!("{json_string}\n")
277    }
278}
279
280impl Serialize for LogLineWrapper {
281    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
282    where
283        S: Serializer,
284    {
285        let mut json_obj = IndexMap::new();
286        let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
287        json_obj.insert("timestamp".to_string(), timestamp);
288        json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
289        json_obj.insert("level".to_string(), self.line.level.to_string());
290        json_obj.insert("color".to_string(), self.line.color.to_string());
291        json_obj.insert("component".to_string(), self.line.component.to_string());
292        json_obj.insert("message".to_string(), self.line.message.to_string());
293
294        json_obj.serialize(serializer)
295    }
296}
297
298impl Log for Logger {
299    fn enabled(&self, metadata: &log::Metadata) -> bool {
300        !LOGGING_BYPASSED.load(Ordering::Relaxed)
301            && (metadata.level() == Level::Error
302                || metadata.level() <= self.config.stdout_level
303                || metadata.level() <= self.config.fileout_level)
304    }
305
306    fn log(&self, record: &log::Record) {
307        if self.enabled(record.metadata()) {
308            let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
309                get_atomic_clock_realtime().get_time_ns()
310            } else {
311                get_atomic_clock_static().get_time_ns()
312            };
313            let level = record.level();
314            let key_values = record.key_values();
315            let color: LogColor = key_values
316                .get(KV_COLOR.into())
317                .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
318                .unwrap_or(level.into());
319            let component = key_values.get(KV_COMPONENT.into()).map_or_else(
320                || Ustr::from(record.metadata().target()),
321                |v| Ustr::from(&v.to_string()),
322            );
323
324            let line = LogLine {
325                timestamp,
326                level,
327                color,
328                component,
329                message: format!("{}", record.args()),
330            };
331            if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
332                eprintln!("Error sending log event (receiver closed): {line}");
333            }
334        }
335    }
336
337    fn flush(&self) {
338        // Don't attempt to flush if we're already bypassed/shutdown
339        if LOGGING_BYPASSED.load(Ordering::Relaxed) {
340            return;
341        }
342
343        if let Err(e) = self.tx.send(LogEvent::Flush) {
344            eprintln!("Error sending flush log event: {e}");
345        }
346    }
347}
348
349#[allow(clippy::too_many_arguments)]
350impl Logger {
351    /// Initializes the logger based on the `NAUTILUS_LOG` environment variable.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if reading the environment variable or parsing the configuration fails.
356    pub fn init_with_env(
357        trader_id: TraderId,
358        instance_id: UUID4,
359        file_config: FileWriterConfig,
360    ) -> anyhow::Result<LogGuard> {
361        let config = LoggerConfig::from_env()?;
362        Self::init_with_config(trader_id, instance_id, config, file_config)
363    }
364
365    /// Initializes the logger with the given configuration.
366    ///
367    /// # Examples
368    ///
369    /// ```rust
370    /// let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error");
371    /// let file_config = FileWriterConfig::default();
372    /// let log_guard = Logger::init_with_config(trader_id, instance_id, config, file_config);
373    /// ```
374    /// Initializes the logger with the given `LoggerConfig` and `FileWriterConfig`.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the logger fails to register or initialize the background thread.
379    pub fn init_with_config(
380        trader_id: TraderId,
381        instance_id: UUID4,
382        config: LoggerConfig,
383        file_config: FileWriterConfig,
384    ) -> anyhow::Result<LogGuard> {
385        let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
386
387        let logger_tx = tx.clone();
388        let logger = Self {
389            tx: logger_tx,
390            config: config.clone(),
391        };
392
393        set_boxed_logger(Box::new(logger))?;
394
395        // Store the sender globally so additional guards can be created
396        if LOGGER_TX.set(tx.clone()).is_err() {
397            debug_assert!(
398                false,
399                "LOGGER_TX already set - re-initialization not supported"
400            );
401        }
402
403        let print_config = config.print_config;
404        if print_config {
405            println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
406            println!("Logger initialized with {config:?} {file_config:?}");
407        }
408
409        let handle = std::thread::Builder::new()
410            .name(LOGGING.to_string())
411            .spawn(move || {
412                Self::handle_messages(
413                    trader_id.to_string(),
414                    instance_id.to_string(),
415                    config,
416                    file_config,
417                    rx,
418                );
419            })?;
420
421        // Store the handle globally
422        if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
423            debug_assert!(
424                handle_guard.is_none(),
425                "LOGGER_HANDLE already set - re-initialization not supported"
426            );
427            *handle_guard = Some(handle);
428        }
429
430        let max_level = log::LevelFilter::Trace;
431        set_max_level(max_level);
432
433        if print_config {
434            println!("Logger set as `log` implementation with max level {max_level}");
435        }
436
437        LogGuard::new()
438            .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
439    }
440
441    fn handle_messages(
442        trader_id: String,
443        instance_id: String,
444        config: LoggerConfig,
445        file_config: FileWriterConfig,
446        rx: std::sync::mpsc::Receiver<LogEvent>,
447    ) {
448        let LoggerConfig {
449            stdout_level,
450            fileout_level,
451            component_level,
452            is_colored,
453            print_config: _,
454        } = config;
455
456        let trader_id_cache = Ustr::from(&trader_id);
457
458        // Set up std I/O buffers
459        let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
460        let mut stderr_writer = StderrWriter::new(is_colored);
461
462        // Conditionally create file writer based on fileout_level
463        let mut file_writer_opt = if fileout_level == LevelFilter::Off {
464            None
465        } else {
466            FileWriter::new(
467                trader_id.clone(),
468                instance_id.clone(),
469                file_config.clone(),
470                fileout_level,
471            )
472        };
473
474        let process_event = |event: LogEvent,
475                             stdout_writer: &mut StdoutWriter,
476                             stderr_writer: &mut StderrWriter,
477                             file_writer_opt: &mut Option<FileWriter>| {
478            match event {
479                LogEvent::Log(line) => {
480                    if let Some(&filter_level) = component_level.get(&line.component)
481                        && line.level > filter_level
482                    {
483                        return;
484                    }
485
486                    let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
487
488                    if stderr_writer.enabled(&wrapper.line) {
489                        if is_colored {
490                            stderr_writer.write(wrapper.get_colored());
491                        } else {
492                            stderr_writer.write(wrapper.get_string());
493                        }
494                    }
495
496                    if stdout_writer.enabled(&wrapper.line) {
497                        if is_colored {
498                            stdout_writer.write(wrapper.get_colored());
499                        } else {
500                            stdout_writer.write(wrapper.get_string());
501                        }
502                    }
503
504                    if let Some(file_writer) = file_writer_opt
505                        && file_writer.enabled(&wrapper.line)
506                    {
507                        if file_writer.json_format {
508                            file_writer.write(&wrapper.get_json());
509                        } else {
510                            file_writer.write(wrapper.get_string());
511                        }
512                    }
513                }
514                LogEvent::Flush => {
515                    stdout_writer.flush();
516                    stderr_writer.flush();
517
518                    if let Some(file_writer) = file_writer_opt {
519                        file_writer.flush();
520                    }
521                }
522                LogEvent::Close => {
523                    // Close handled in the main loop; ignore here.
524                }
525            }
526        };
527
528        // Continue to receive and handle log events until channel is hung up
529        while let Ok(event) = rx.recv() {
530            match event {
531                LogEvent::Log(_) | LogEvent::Flush => process_event(
532                    event,
533                    &mut stdout_writer,
534                    &mut stderr_writer,
535                    &mut file_writer_opt,
536                ),
537                LogEvent::Close => {
538                    // First flush what's been written so far
539                    stdout_writer.flush();
540                    stderr_writer.flush();
541
542                    if let Some(ref mut file_writer) = file_writer_opt {
543                        file_writer.flush();
544                    }
545
546                    // Drain any remaining events that may have raced with shutdown
547                    // This ensures logs enqueued just before/around shutdown aren't lost.
548                    while let Ok(evt) = rx.try_recv() {
549                        match evt {
550                            LogEvent::Close => (), // ignore extra Close events
551                            _ => process_event(
552                                evt,
553                                &mut stdout_writer,
554                                &mut stderr_writer,
555                                &mut file_writer_opt,
556                            ),
557                        }
558                    }
559
560                    // Final flush after draining
561                    stdout_writer.flush();
562                    stderr_writer.flush();
563
564                    if let Some(ref mut file_writer) = file_writer_opt {
565                        file_writer.flush();
566                    }
567
568                    break;
569                }
570            }
571        }
572    }
573}
574
575/// Gracefully shuts down the logging subsystem by preventing new log events,
576/// signaling the logging thread to close, draining pending messages, and joining
577/// the logging thread.
578pub(crate) fn shutdown_graceful() {
579    // Prevent further logging
580    LOGGING_BYPASSED.store(true, Ordering::SeqCst);
581    log::set_max_level(log::LevelFilter::Off);
582
583    // Signal Close if the sender exists
584    if let Some(tx) = LOGGER_TX.get() {
585        let _ = tx.send(LogEvent::Close);
586    }
587
588    if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
589        && let Some(handle) = handle_guard.take()
590        && handle.thread().id() != std::thread::current().id()
591    {
592        let _ = handle.join();
593    }
594
595    LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
596}
597
598pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
599    let color = Value::from(color as u8);
600
601    match level {
602        LogLevel::Off => {}
603        LogLevel::Trace => {
604            log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
605        }
606        LogLevel::Debug => {
607            log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
608        }
609        LogLevel::Info => {
610            log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
611        }
612        LogLevel::Warning => {
613            log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
614        }
615        LogLevel::Error => {
616            log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
617        }
618    }
619}
620
621/// A guard that manages the lifecycle of the logging subsystem.
622///
623/// `LogGuard` ensures the logging thread remains active while instances exist and properly
624/// terminates when all guards are dropped. The system uses reference counting to track active
625/// guards - when the last `LogGuard` is dropped, the logging thread is joined to ensure all
626/// pending log messages are written before the process terminates.
627///
628/// # Reference Counting
629///
630/// The logging system maintains a global atomic counter of active `LogGuard` instances. This
631/// ensures that:
632/// - The logging thread remains active as long as at least one `LogGuard` exists.
633/// - All log messages are properly flushed when intermediate guards are dropped.
634/// - The logging thread is cleanly terminated and joined when the last guard is dropped.
635///
636/// # Limits
637///
638/// The system supports a maximum of 255 concurrent `LogGuard` instances.
639#[cfg_attr(
640    feature = "python",
641    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
642)]
643#[derive(Debug)]
644pub struct LogGuard {
645    tx: std::sync::mpsc::Sender<LogEvent>,
646}
647
648impl LogGuard {
649    /// Creates a new [`LogGuard`] instance from the global logger.
650    ///
651    /// Returns `None` if logging has not been initialized.
652    ///
653    /// # Panics
654    ///
655    /// Panics if the number of active LogGuards would exceed 255.
656    #[must_use]
657    pub fn new() -> Option<Self> {
658        LOGGER_TX.get().map(|tx| {
659            LOGGING_GUARDS_ACTIVE
660                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
661                    if count == u8::MAX {
662                        None // Reject the update if we're at the limit
663                    } else {
664                        Some(count + 1)
665                    }
666                })
667                .expect("Maximum number of active LogGuards (255) exceeded");
668
669            Self { tx: tx.clone() }
670        })
671    }
672}
673
674impl Drop for LogGuard {
675    fn drop(&mut self) {
676        let previous_count = LOGGING_GUARDS_ACTIVE
677            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
678                if count == 0 {
679                    panic!("LogGuard reference count underflow");
680                }
681                Some(count - 1)
682            })
683            .expect("Failed to decrement LogGuard count");
684
685        // Check if this was the last LogGuard - re-check after decrement to avoid race
686        if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
687            // This is truly the last LogGuard, so we should close the logger and join the thread
688            // to ensure all log messages are written before the process terminates.
689            // Prevent any new log events from being accepted while shutting down.
690            LOGGING_BYPASSED.store(true, Ordering::SeqCst);
691
692            // Disable all log levels to reduce overhead on late calls
693            log::set_max_level(log::LevelFilter::Off);
694
695            // Ensure Close is delivered before joining (critical for shutdown)
696            let _ = self.tx.send(LogEvent::Close);
697
698            // Join the logging thread to ensure all pending logs are written
699            if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
700                && let Some(handle) = handle_guard.take()
701            {
702                // Avoid self-join deadlock
703                if handle.thread().id() != std::thread::current().id() {
704                    let _ = handle.join();
705                }
706            }
707
708            // Reset LOGGING_INITIALIZED since the logging thread has terminated
709            LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
710        } else {
711            // Other LogGuards are still active, just flush our logs
712            let _ = self.tx.send(LogEvent::Flush);
713        }
714    }
715}
716
717////////////////////////////////////////////////////////////////////////////////
718// Tests
719////////////////////////////////////////////////////////////////////////////////
720#[cfg(test)]
721mod tests {
722    use std::{collections::HashMap, thread::sleep, time::Duration};
723
724    use log::LevelFilter;
725    use nautilus_core::UUID4;
726    use nautilus_model::identifiers::TraderId;
727    use rstest::*;
728    use serde_json::Value;
729    use tempfile::tempdir;
730    use ustr::Ustr;
731
732    use super::*;
733    use crate::{
734        enums::LogColor,
735        logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
736        testing::wait_until,
737    };
738
739    #[rstest]
740    fn log_message_serialization() {
741        let log_message = LogLine {
742            timestamp: UnixNanos::default(),
743            level: log::Level::Info,
744            color: LogColor::Normal,
745            component: Ustr::from("Portfolio"),
746            message: "This is a log message".to_string(),
747        };
748
749        let serialized_json = serde_json::to_string(&log_message).unwrap();
750        let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
751
752        assert_eq!(deserialized_value["level"], "INFO");
753        assert_eq!(deserialized_value["component"], "Portfolio");
754        assert_eq!(deserialized_value["message"], "This is a log message");
755    }
756
757    #[rstest]
758    fn log_config_parsing() {
759        let config =
760            LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
761                .unwrap();
762        assert_eq!(
763            config,
764            LoggerConfig {
765                stdout_level: LevelFilter::Info,
766                fileout_level: LevelFilter::Debug,
767                component_level: HashMap::from_iter(vec![(
768                    Ustr::from("RiskEngine"),
769                    LevelFilter::Error
770                )]),
771                is_colored: true,
772                print_config: false,
773            }
774        );
775    }
776
777    #[rstest]
778    fn log_config_parsing2() {
779        let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
780        assert_eq!(
781            config,
782            LoggerConfig {
783                stdout_level: LevelFilter::Warn,
784                fileout_level: LevelFilter::Error,
785                component_level: HashMap::new(),
786                is_colored: true,
787                print_config: true,
788            }
789        );
790    }
791
792    // These tests need to run serially because they use global logging state
793    mod serial_tests {
794        use super::*;
795
796        #[rstest]
797        fn test_logging_to_file() {
798            let config = LoggerConfig {
799                fileout_level: LevelFilter::Debug,
800                ..Default::default()
801            };
802
803            let temp_dir = tempdir().expect("Failed to create temporary directory");
804            let file_config = FileWriterConfig {
805                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
806                ..Default::default()
807            };
808
809            let log_guard = Logger::init_with_config(
810                TraderId::from("TRADER-001"),
811                UUID4::new(),
812                config,
813                file_config,
814            );
815
816            logging_clock_set_static_mode();
817            logging_clock_set_static_time(1_650_000_000_000_000);
818
819            log::info!(
820                component = "RiskEngine";
821                "This is a test."
822            );
823
824            let mut log_contents = String::new();
825
826            wait_until(
827                || {
828                    std::fs::read_dir(&temp_dir)
829                        .expect("Failed to read directory")
830                        .filter_map(Result::ok)
831                        .any(|entry| entry.path().is_file())
832                },
833                Duration::from_secs(3),
834            );
835
836            drop(log_guard); // Ensure log buffers are flushed
837
838            wait_until(
839                || {
840                    let log_file_path = std::fs::read_dir(&temp_dir)
841                        .expect("Failed to read directory")
842                        .filter_map(Result::ok)
843                        .find(|entry| entry.path().is_file())
844                        .expect("No files found in directory")
845                        .path();
846                    dbg!(&log_file_path);
847                    log_contents = std::fs::read_to_string(log_file_path)
848                        .expect("Error while reading log file");
849                    !log_contents.is_empty()
850                },
851                Duration::from_secs(3),
852            );
853
854            assert_eq!(
855                log_contents,
856                "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
857            );
858        }
859
860        #[rstest]
861        fn test_shutdown_drains_backlog_tail() {
862            // Configure file logging at Info level
863            let config = LoggerConfig {
864                stdout_level: LevelFilter::Off,
865                fileout_level: LevelFilter::Info,
866                ..Default::default()
867            };
868
869            let temp_dir = tempdir().expect("Failed to create temporary directory");
870            let file_config = FileWriterConfig {
871                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
872                ..Default::default()
873            };
874
875            let log_guard = Logger::init_with_config(
876                TraderId::from("TRADER-TAIL"),
877                UUID4::new(),
878                config,
879                file_config,
880            )
881            .expect("Failed to initialize logger");
882
883            // Use static time for reproducibility
884            logging_clock_set_static_mode();
885            logging_clock_set_static_time(1_700_000_000_000_000);
886
887            // Enqueue a known number of messages synchronously
888            const N: usize = 1000;
889            for i in 0..N {
890                log::info!(component = "TailDrain"; "BacklogTest {i}");
891            }
892
893            // Drop guard to trigger shutdown (bypass + close + drain)
894            drop(log_guard);
895
896            // Wait until the file exists and contains at least N lines with our marker
897            let mut count = 0usize;
898            wait_until(
899                || {
900                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
901                        .expect("Failed to read directory")
902                        .filter_map(Result::ok)
903                        .find(|entry| entry.path().is_file())
904                    {
905                        let log_file_path = log_file.path();
906                        if let Ok(contents) = std::fs::read_to_string(log_file_path) {
907                            count = contents
908                                .lines()
909                                .filter(|l| l.contains("BacklogTest "))
910                                .count();
911                            count >= N
912                        } else {
913                            false
914                        }
915                    } else {
916                        false
917                    }
918                },
919                Duration::from_secs(5),
920            );
921
922            assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
923        }
924
925        #[rstest]
926        fn test_log_component_level_filtering() {
927            let config =
928                LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
929
930            let temp_dir = tempdir().expect("Failed to create temporary directory");
931            let file_config = FileWriterConfig {
932                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
933                ..Default::default()
934            };
935
936            let log_guard = Logger::init_with_config(
937                TraderId::from("TRADER-001"),
938                UUID4::new(),
939                config,
940                file_config,
941            );
942
943            logging_clock_set_static_mode();
944            logging_clock_set_static_time(1_650_000_000_000_000);
945
946            log::info!(
947                component = "RiskEngine";
948                "This is a test."
949            );
950
951            drop(log_guard); // Ensure log buffers are flushed
952
953            wait_until(
954                || {
955                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
956                        .expect("Failed to read directory")
957                        .filter_map(Result::ok)
958                        .find(|entry| entry.path().is_file())
959                    {
960                        let log_file_path = log_file.path();
961                        let log_contents = std::fs::read_to_string(log_file_path)
962                            .expect("Error while reading log file");
963                        !log_contents.contains("RiskEngine")
964                    } else {
965                        false
966                    }
967                },
968                Duration::from_secs(3),
969            );
970
971            assert!(
972                std::fs::read_dir(&temp_dir)
973                    .expect("Failed to read directory")
974                    .filter_map(Result::ok)
975                    .any(|entry| entry.path().is_file()),
976                "Log file exists"
977            );
978        }
979
980        #[rstest]
981        fn test_logging_to_file_in_json_format() {
982            let config =
983                LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
984                    .unwrap();
985
986            let temp_dir = tempdir().expect("Failed to create temporary directory");
987            let file_config = FileWriterConfig {
988                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
989                file_format: Some("json".to_string()),
990                ..Default::default()
991            };
992
993            let log_guard = Logger::init_with_config(
994                TraderId::from("TRADER-001"),
995                UUID4::new(),
996                config,
997                file_config,
998            );
999
1000            logging_clock_set_static_mode();
1001            logging_clock_set_static_time(1_650_000_000_000_000);
1002
1003            log::info!(
1004                component = "RiskEngine";
1005                "This is a test."
1006            );
1007
1008            let mut log_contents = String::new();
1009
1010            drop(log_guard); // Ensure log buffers are flushed
1011
1012            wait_until(
1013                || {
1014                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
1015                        .expect("Failed to read directory")
1016                        .filter_map(Result::ok)
1017                        .find(|entry| entry.path().is_file())
1018                    {
1019                        let log_file_path = log_file.path();
1020                        log_contents = std::fs::read_to_string(log_file_path)
1021                            .expect("Error while reading log file");
1022                        !log_contents.is_empty()
1023                    } else {
1024                        false
1025                    }
1026                },
1027                Duration::from_secs(3),
1028            );
1029
1030            assert_eq!(
1031                log_contents,
1032                "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1033            );
1034        }
1035
1036        #[ignore = "Flaky test: Passing locally on some systems, failing in CI"]
1037        #[rstest]
1038        fn test_file_rotation_and_backup_limits() {
1039            // Create a temporary directory for log files
1040            let temp_dir = tempdir().expect("Failed to create temporary directory");
1041            let dir_path = temp_dir.path().to_str().unwrap().to_string();
1042
1043            // Configure a small max file size to trigger rotation quickly
1044            let max_backups = 3;
1045            let max_file_size = 100;
1046            let file_config = FileWriterConfig {
1047                directory: Some(dir_path.clone()),
1048                file_name: None,
1049                file_format: Some("log".to_string()),
1050                file_rotate: Some((max_file_size, max_backups).into()), // 100 bytes max size, 3 max backups
1051            };
1052
1053            // Create the file writer
1054            let config = LoggerConfig::from_spec("fileout=Info;Test=Info").unwrap();
1055            let log_guard = Logger::init_with_config(
1056                TraderId::from("TRADER-001"),
1057                UUID4::new(),
1058                config,
1059                file_config,
1060            );
1061
1062            log::info!(
1063                component = "Test";
1064                "Test log message with enough content to exceed our small max file size limit"
1065            );
1066
1067            sleep(Duration::from_millis(100));
1068
1069            // Count the number of log files in the directory
1070            let files: Vec<_> = std::fs::read_dir(&dir_path)
1071                .expect("Failed to read directory")
1072                .filter_map(Result::ok)
1073                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1074                .collect();
1075
1076            // We should have multiple files due to rotation
1077            assert_eq!(files.len(), 1);
1078
1079            log::info!(
1080                component = "Test";
1081                "Test log message with enough content to exceed our small max file size limit"
1082            );
1083
1084            sleep(Duration::from_millis(100));
1085
1086            // Count the number of log files in the directory
1087            let files: Vec<_> = std::fs::read_dir(&dir_path)
1088                .expect("Failed to read directory")
1089                .filter_map(Result::ok)
1090                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1091                .collect();
1092
1093            // We should have multiple files due to rotation
1094            assert_eq!(files.len(), 2);
1095
1096            for _ in 0..5 {
1097                // Write enough data to trigger a few rotations
1098                log::info!(
1099                component = "Test";
1100                "Test log message with enough content to exceed our small max file size limit"
1101                );
1102
1103                sleep(Duration::from_millis(100));
1104            }
1105
1106            // Count the number of log files in the directory
1107            let files: Vec<_> = std::fs::read_dir(&dir_path)
1108                .expect("Failed to read directory")
1109                .filter_map(Result::ok)
1110                .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1111                .collect();
1112
1113            // We should have at most max_backups + 1 files (current file + backups)
1114            assert!(
1115                files.len() == max_backups as usize + 1,
1116                "Expected at most {} log files, found {}",
1117                max_backups,
1118                files.len()
1119            );
1120
1121            // Clean up
1122            drop(log_guard);
1123            drop(temp_dir);
1124        }
1125    }
1126}