1use std::{
17 collections::HashMap,
18 env,
19 fmt::Display,
20 str::FromStr,
21 sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27 kv::{ToValue, Value},
28 set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31 UUID4, UnixNanos,
32 datetime::unix_nanos_to_iso8601,
33 time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
40use crate::{
41 enums::{LogColor, LogLevel},
42 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46const KV_COLOR: &str = "color";
47const KV_COMPONENT: &str = "component";
48
49static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
51
52static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
54
55#[cfg_attr(
56 feature = "python",
57 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
58)]
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct LoggerConfig {
61 pub stdout_level: LevelFilter,
63 pub fileout_level: LevelFilter,
65 component_level: HashMap<Ustr, LevelFilter>,
67 pub log_components_only: bool,
69 pub is_colored: bool,
71 pub print_config: bool,
73}
74
75impl Default for LoggerConfig {
76 fn default() -> Self {
78 Self {
79 stdout_level: LevelFilter::Info,
80 fileout_level: LevelFilter::Off,
81 component_level: HashMap::new(),
82 log_components_only: false,
83 is_colored: true,
84 print_config: false,
85 }
86 }
87}
88
89impl LoggerConfig {
90 #[must_use]
92 pub const fn new(
93 stdout_level: LevelFilter,
94 fileout_level: LevelFilter,
95 component_level: HashMap<Ustr, LevelFilter>,
96 log_components_only: bool,
97 is_colored: bool,
98 print_config: bool,
99 ) -> Self {
100 Self {
101 stdout_level,
102 fileout_level,
103 component_level,
104 log_components_only,
105 is_colored,
106 print_config,
107 }
108 }
109
110 pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
114 let mut config = Self::default();
115 for kv in spec.split(';') {
116 let kv = kv.trim();
117 if kv.is_empty() {
118 continue;
119 }
120 let kv_lower = kv.to_lowercase(); if kv_lower == "log_components_only" {
123 config.log_components_only = true;
124 } else if kv_lower == "is_colored" {
125 config.is_colored = true;
126 } else if kv_lower == "print_config" {
127 config.print_config = true;
128 } else {
129 let parts: Vec<&str> = kv.split('=').collect();
130 if parts.len() != 2 {
131 anyhow::bail!("Invalid spec pair: {}", kv);
132 }
133 let k = parts[0].trim(); let v = parts[1].trim(); let lvl = LevelFilter::from_str(v)
136 .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
137 let k_lower = k.to_lowercase(); match k_lower.as_str() {
139 "stdout" => config.stdout_level = lvl,
140 "fileout" => config.fileout_level = lvl,
141 _ => {
142 config.component_level.insert(Ustr::from(k), lvl);
143 }
144 }
145 }
146 }
147 Ok(config)
148 }
149
150 pub fn from_env() -> anyhow::Result<Self> {
156 let spec = env::var("NAUTILUS_LOG")?;
157 Self::from_spec(&spec)
158 }
159}
160
161#[derive(Debug)]
167pub struct Logger {
168 pub config: LoggerConfig,
170 tx: std::sync::mpsc::Sender<LogEvent>,
172}
173
174#[derive(Debug)]
176pub enum LogEvent {
177 Log(LogLine),
179 Flush,
181 Close,
183}
184
185#[derive(Clone, Debug, Serialize, Deserialize)]
187pub struct LogLine {
188 pub timestamp: UnixNanos,
190 pub level: Level,
192 pub color: LogColor,
194 pub component: Ustr,
196 pub message: String,
198}
199
200impl Display for LogLine {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
203 }
204}
205
206#[derive(Clone, Debug)]
213pub struct LogLineWrapper {
214 line: LogLine,
216 cache: Option<String>,
218 colored: Option<String>,
220 trader_id: Ustr,
222}
223
224impl LogLineWrapper {
225 #[must_use]
227 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
228 Self {
229 line,
230 cache: None,
231 colored: None,
232 trader_id,
233 }
234 }
235
236 pub fn get_string(&mut self) -> &str {
241 self.cache.get_or_insert_with(|| {
242 format!(
243 "{} [{}] {}.{}: {}\n",
244 unix_nanos_to_iso8601(self.line.timestamp),
245 self.line.level,
246 self.trader_id,
247 &self.line.component,
248 &self.line.message,
249 )
250 })
251 }
252
253 pub fn get_colored(&mut self) -> &str {
259 self.colored.get_or_insert_with(|| {
260 format!(
261 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
262 unix_nanos_to_iso8601(self.line.timestamp),
263 &self.line.color.as_ansi(),
264 self.line.level,
265 self.trader_id,
266 &self.line.component,
267 &self.line.message,
268 )
269 })
270 }
271
272 #[must_use]
281 pub fn get_json(&self) -> String {
282 let json_string =
283 serde_json::to_string(&self).expect("Error serializing log event to string");
284 format!("{json_string}\n")
285 }
286}
287
288impl Serialize for LogLineWrapper {
289 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290 where
291 S: Serializer,
292 {
293 let mut json_obj = IndexMap::new();
294 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
295 json_obj.insert("timestamp".to_string(), timestamp);
296 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
297 json_obj.insert("level".to_string(), self.line.level.to_string());
298 json_obj.insert("color".to_string(), self.line.color.to_string());
299 json_obj.insert("component".to_string(), self.line.component.to_string());
300 json_obj.insert("message".to_string(), self.line.message.to_string());
301
302 json_obj.serialize(serializer)
303 }
304}
305
306impl Log for Logger {
307 fn enabled(&self, metadata: &log::Metadata) -> bool {
308 !LOGGING_BYPASSED.load(Ordering::Relaxed)
309 && (metadata.level() == Level::Error
310 || metadata.level() <= self.config.stdout_level
311 || metadata.level() <= self.config.fileout_level)
312 }
313
314 fn log(&self, record: &log::Record) {
315 if self.enabled(record.metadata()) {
316 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
317 get_atomic_clock_realtime().get_time_ns()
318 } else {
319 get_atomic_clock_static().get_time_ns()
320 };
321 let level = record.level();
322 let key_values = record.key_values();
323 let color: LogColor = key_values
324 .get(KV_COLOR.into())
325 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
326 .unwrap_or(level.into());
327 let component = key_values.get(KV_COMPONENT.into()).map_or_else(
328 || Ustr::from(record.metadata().target()),
329 |v| Ustr::from(&v.to_string()),
330 );
331
332 let line = LogLine {
333 timestamp,
334 level,
335 color,
336 component,
337 message: format!("{}", record.args()),
338 };
339 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
340 eprintln!("Error sending log event (receiver closed): {line}");
341 }
342 }
343 }
344
345 fn flush(&self) {
346 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
348 return;
349 }
350
351 if let Err(e) = self.tx.send(LogEvent::Flush) {
352 eprintln!("Error sending flush log event: {e}");
353 }
354 }
355}
356
357#[allow(clippy::too_many_arguments)]
358impl Logger {
359 pub fn init_with_env(
365 trader_id: TraderId,
366 instance_id: UUID4,
367 file_config: FileWriterConfig,
368 ) -> anyhow::Result<LogGuard> {
369 let config = LoggerConfig::from_env()?;
370 Self::init_with_config(trader_id, instance_id, config, file_config)
371 }
372
373 pub fn init_with_config(
388 trader_id: TraderId,
389 instance_id: UUID4,
390 config: LoggerConfig,
391 file_config: FileWriterConfig,
392 ) -> anyhow::Result<LogGuard> {
393 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
394
395 let logger_tx = tx.clone();
396 let logger = Self {
397 tx: logger_tx,
398 config: config.clone(),
399 };
400
401 set_boxed_logger(Box::new(logger))?;
402
403 if LOGGER_TX.set(tx.clone()).is_err() {
405 debug_assert!(
406 false,
407 "LOGGER_TX already set - re-initialization not supported"
408 );
409 }
410
411 let print_config = config.print_config;
412 if print_config {
413 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
414 println!("Logger initialized with {config:?} {file_config:?}");
415 }
416
417 let handle = std::thread::Builder::new()
418 .name(LOGGING.to_string())
419 .spawn(move || {
420 Self::handle_messages(
421 trader_id.to_string(),
422 instance_id.to_string(),
423 config,
424 file_config,
425 rx,
426 );
427 })?;
428
429 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
431 debug_assert!(
432 handle_guard.is_none(),
433 "LOGGER_HANDLE already set - re-initialization not supported"
434 );
435 *handle_guard = Some(handle);
436 }
437
438 let max_level = log::LevelFilter::Trace;
439 set_max_level(max_level);
440
441 if print_config {
442 println!("Logger set as `log` implementation with max level {max_level}");
443 }
444
445 LogGuard::new()
446 .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
447 }
448
449 fn handle_messages(
450 trader_id: String,
451 instance_id: String,
452 config: LoggerConfig,
453 file_config: FileWriterConfig,
454 rx: std::sync::mpsc::Receiver<LogEvent>,
455 ) {
456 let LoggerConfig {
457 stdout_level,
458 fileout_level,
459 component_level,
460 log_components_only,
461 is_colored,
462 print_config: _,
463 } = config;
464
465 let trader_id_cache = Ustr::from(&trader_id);
466
467 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
469 let mut stderr_writer = StderrWriter::new(is_colored);
470
471 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
473 None
474 } else {
475 FileWriter::new(
476 trader_id.clone(),
477 instance_id.clone(),
478 file_config.clone(),
479 fileout_level,
480 )
481 };
482
483 let process_event = |event: LogEvent,
484 stdout_writer: &mut StdoutWriter,
485 stderr_writer: &mut StderrWriter,
486 file_writer_opt: &mut Option<FileWriter>| {
487 match event {
488 LogEvent::Log(line) => {
489 let component_filter_level = component_level.get(&line.component);
490
491 if log_components_only && component_filter_level.is_none() {
492 return;
493 }
494
495 if let Some(&filter_level) = component_filter_level
496 && line.level > filter_level
497 {
498 return;
499 }
500
501 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
502
503 if stderr_writer.enabled(&wrapper.line) {
504 if is_colored {
505 stderr_writer.write(wrapper.get_colored());
506 } else {
507 stderr_writer.write(wrapper.get_string());
508 }
509 }
510
511 if stdout_writer.enabled(&wrapper.line) {
512 if is_colored {
513 stdout_writer.write(wrapper.get_colored());
514 } else {
515 stdout_writer.write(wrapper.get_string());
516 }
517 }
518
519 if let Some(file_writer) = file_writer_opt
520 && file_writer.enabled(&wrapper.line)
521 {
522 if file_writer.json_format {
523 file_writer.write(&wrapper.get_json());
524 } else {
525 file_writer.write(wrapper.get_string());
526 }
527 }
528 }
529 LogEvent::Flush => {
530 stdout_writer.flush();
531 stderr_writer.flush();
532
533 if let Some(file_writer) = file_writer_opt {
534 file_writer.flush();
535 }
536 }
537 LogEvent::Close => {
538 }
540 }
541 };
542
543 while let Ok(event) = rx.recv() {
545 match event {
546 LogEvent::Log(_) | LogEvent::Flush => process_event(
547 event,
548 &mut stdout_writer,
549 &mut stderr_writer,
550 &mut file_writer_opt,
551 ),
552 LogEvent::Close => {
553 stdout_writer.flush();
555 stderr_writer.flush();
556
557 if let Some(ref mut file_writer) = file_writer_opt {
558 file_writer.flush();
559 }
560
561 while let Ok(evt) = rx.try_recv() {
564 match evt {
565 LogEvent::Close => (), _ => process_event(
567 evt,
568 &mut stdout_writer,
569 &mut stderr_writer,
570 &mut file_writer_opt,
571 ),
572 }
573 }
574
575 stdout_writer.flush();
577 stderr_writer.flush();
578
579 if let Some(ref mut file_writer) = file_writer_opt {
580 file_writer.flush();
581 }
582
583 break;
584 }
585 }
586 }
587 }
588}
589
590pub(crate) fn shutdown_graceful() {
594 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
596 log::set_max_level(log::LevelFilter::Off);
597
598 if let Some(tx) = LOGGER_TX.get() {
600 let _ = tx.send(LogEvent::Close);
601 }
602
603 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
604 && let Some(handle) = handle_guard.take()
605 && handle.thread().id() != std::thread::current().id()
606 {
607 let _ = handle.join();
608 }
609
610 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
611}
612
613pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
614 let color = Value::from(color as u8);
615
616 match level {
617 LogLevel::Off => {}
618 LogLevel::Trace => {
619 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
620 }
621 LogLevel::Debug => {
622 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
623 }
624 LogLevel::Info => {
625 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
626 }
627 LogLevel::Warning => {
628 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
629 }
630 LogLevel::Error => {
631 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
632 }
633 }
634}
635
636#[cfg_attr(
655 feature = "python",
656 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
657)]
658#[derive(Debug)]
659pub struct LogGuard {
660 tx: std::sync::mpsc::Sender<LogEvent>,
661}
662
663impl LogGuard {
664 #[must_use]
672 pub fn new() -> Option<Self> {
673 LOGGER_TX.get().map(|tx| {
674 LOGGING_GUARDS_ACTIVE
675 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
676 if count == u8::MAX {
677 None } else {
679 Some(count + 1)
680 }
681 })
682 .expect("Maximum number of active LogGuards (255) exceeded");
683
684 Self { tx: tx.clone() }
685 })
686 }
687}
688
689impl Drop for LogGuard {
690 fn drop(&mut self) {
691 let previous_count = LOGGING_GUARDS_ACTIVE
692 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
693 if count == 0 {
694 panic!("LogGuard reference count underflow");
695 }
696 Some(count - 1)
697 })
698 .expect("Failed to decrement LogGuard count");
699
700 if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
702 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
706
707 log::set_max_level(log::LevelFilter::Off);
709
710 let _ = self.tx.send(LogEvent::Close);
712
713 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
715 && let Some(handle) = handle_guard.take()
716 {
717 if handle.thread().id() != std::thread::current().id() {
719 let _ = handle.join();
720 }
721 }
722
723 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
725 } else {
726 let _ = self.tx.send(LogEvent::Flush);
728 }
729 }
730}
731
732#[cfg(test)]
736mod tests {
737 use std::{collections::HashMap, thread::sleep, time::Duration};
738
739 use log::LevelFilter;
740 use nautilus_core::UUID4;
741 use nautilus_model::identifiers::TraderId;
742 use rstest::*;
743 use serde_json::Value;
744 use tempfile::tempdir;
745 use ustr::Ustr;
746
747 use super::*;
748 use crate::{
749 enums::LogColor,
750 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
751 testing::wait_until,
752 };
753
754 #[rstest]
755 fn log_message_serialization() {
756 let log_message = LogLine {
757 timestamp: UnixNanos::default(),
758 level: log::Level::Info,
759 color: LogColor::Normal,
760 component: Ustr::from("Portfolio"),
761 message: "This is a log message".to_string(),
762 };
763
764 let serialized_json = serde_json::to_string(&log_message).unwrap();
765 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
766
767 assert_eq!(deserialized_value["level"], "INFO");
768 assert_eq!(deserialized_value["component"], "Portfolio");
769 assert_eq!(deserialized_value["message"], "This is a log message");
770 }
771
772 #[rstest]
773 fn log_config_parsing() {
774 let config =
775 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
776 .unwrap();
777 assert_eq!(
778 config,
779 LoggerConfig {
780 stdout_level: LevelFilter::Info,
781 fileout_level: LevelFilter::Debug,
782 component_level: HashMap::from_iter(vec![(
783 Ustr::from("RiskEngine"),
784 LevelFilter::Error
785 )]),
786 log_components_only: false,
787 is_colored: true,
788 print_config: false,
789 }
790 );
791 }
792
793 #[rstest]
794 fn log_config_parsing2() {
795 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
796 assert_eq!(
797 config,
798 LoggerConfig {
799 stdout_level: LevelFilter::Warn,
800 fileout_level: LevelFilter::Error,
801 component_level: HashMap::new(),
802 log_components_only: false,
803 is_colored: true,
804 print_config: true,
805 }
806 );
807 }
808
809 #[rstest]
810 fn log_config_parsing_with_log_components_only() {
811 let config =
812 LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
813 assert_eq!(
814 config,
815 LoggerConfig {
816 stdout_level: LevelFilter::Info,
817 fileout_level: LevelFilter::Off,
818 component_level: HashMap::from_iter(vec![(
819 Ustr::from("RiskEngine"),
820 LevelFilter::Debug
821 )]),
822 log_components_only: true,
823 is_colored: true,
824 print_config: false,
825 }
826 );
827 }
828
829 mod serial_tests {
831 use super::*;
832
833 #[rstest]
834 fn test_logging_to_file() {
835 let config = LoggerConfig {
836 fileout_level: LevelFilter::Debug,
837 ..Default::default()
838 };
839
840 let temp_dir = tempdir().expect("Failed to create temporary directory");
841 let file_config = FileWriterConfig {
842 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
843 ..Default::default()
844 };
845
846 let log_guard = Logger::init_with_config(
847 TraderId::from("TRADER-001"),
848 UUID4::new(),
849 config,
850 file_config,
851 );
852
853 logging_clock_set_static_mode();
854 logging_clock_set_static_time(1_650_000_000_000_000);
855
856 log::info!(
857 component = "RiskEngine";
858 "This is a test."
859 );
860
861 let mut log_contents = String::new();
862
863 wait_until(
864 || {
865 std::fs::read_dir(&temp_dir)
866 .expect("Failed to read directory")
867 .filter_map(Result::ok)
868 .any(|entry| entry.path().is_file())
869 },
870 Duration::from_secs(3),
871 );
872
873 drop(log_guard); wait_until(
876 || {
877 let log_file_path = std::fs::read_dir(&temp_dir)
878 .expect("Failed to read directory")
879 .filter_map(Result::ok)
880 .find(|entry| entry.path().is_file())
881 .expect("No files found in directory")
882 .path();
883 dbg!(&log_file_path);
884 log_contents = std::fs::read_to_string(log_file_path)
885 .expect("Error while reading log file");
886 !log_contents.is_empty()
887 },
888 Duration::from_secs(3),
889 );
890
891 assert_eq!(
892 log_contents,
893 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
894 );
895 }
896
897 #[rstest]
898 fn test_shutdown_drains_backlog_tail() {
899 let config = LoggerConfig {
901 stdout_level: LevelFilter::Off,
902 fileout_level: LevelFilter::Info,
903 ..Default::default()
904 };
905
906 let temp_dir = tempdir().expect("Failed to create temporary directory");
907 let file_config = FileWriterConfig {
908 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
909 ..Default::default()
910 };
911
912 let log_guard = Logger::init_with_config(
913 TraderId::from("TRADER-TAIL"),
914 UUID4::new(),
915 config,
916 file_config,
917 )
918 .expect("Failed to initialize logger");
919
920 logging_clock_set_static_mode();
922 logging_clock_set_static_time(1_700_000_000_000_000);
923
924 const N: usize = 1000;
926 for i in 0..N {
927 log::info!(component = "TailDrain"; "BacklogTest {i}");
928 }
929
930 drop(log_guard);
932
933 let mut count = 0usize;
935 wait_until(
936 || {
937 if let Some(log_file) = std::fs::read_dir(&temp_dir)
938 .expect("Failed to read directory")
939 .filter_map(Result::ok)
940 .find(|entry| entry.path().is_file())
941 {
942 let log_file_path = log_file.path();
943 if let Ok(contents) = std::fs::read_to_string(log_file_path) {
944 count = contents
945 .lines()
946 .filter(|l| l.contains("BacklogTest "))
947 .count();
948 count >= N
949 } else {
950 false
951 }
952 } else {
953 false
954 }
955 },
956 Duration::from_secs(5),
957 );
958
959 assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
960 }
961
962 #[rstest]
963 fn test_log_component_level_filtering() {
964 let config =
965 LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
966
967 let temp_dir = tempdir().expect("Failed to create temporary directory");
968 let file_config = FileWriterConfig {
969 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
970 ..Default::default()
971 };
972
973 let log_guard = Logger::init_with_config(
974 TraderId::from("TRADER-001"),
975 UUID4::new(),
976 config,
977 file_config,
978 );
979
980 logging_clock_set_static_mode();
981 logging_clock_set_static_time(1_650_000_000_000_000);
982
983 log::info!(
984 component = "RiskEngine";
985 "This is a test."
986 );
987
988 drop(log_guard); wait_until(
991 || {
992 if let Some(log_file) = std::fs::read_dir(&temp_dir)
993 .expect("Failed to read directory")
994 .filter_map(Result::ok)
995 .find(|entry| entry.path().is_file())
996 {
997 let log_file_path = log_file.path();
998 let log_contents = std::fs::read_to_string(log_file_path)
999 .expect("Error while reading log file");
1000 !log_contents.contains("RiskEngine")
1001 } else {
1002 false
1003 }
1004 },
1005 Duration::from_secs(3),
1006 );
1007
1008 assert!(
1009 std::fs::read_dir(&temp_dir)
1010 .expect("Failed to read directory")
1011 .filter_map(Result::ok)
1012 .any(|entry| entry.path().is_file()),
1013 "Log file exists"
1014 );
1015 }
1016
1017 #[rstest]
1018 fn test_logging_to_file_in_json_format() {
1019 let config =
1020 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1021 .unwrap();
1022
1023 let temp_dir = tempdir().expect("Failed to create temporary directory");
1024 let file_config = FileWriterConfig {
1025 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1026 file_format: Some("json".to_string()),
1027 ..Default::default()
1028 };
1029
1030 let log_guard = Logger::init_with_config(
1031 TraderId::from("TRADER-001"),
1032 UUID4::new(),
1033 config,
1034 file_config,
1035 );
1036
1037 logging_clock_set_static_mode();
1038 logging_clock_set_static_time(1_650_000_000_000_000);
1039
1040 log::info!(
1041 component = "RiskEngine";
1042 "This is a test."
1043 );
1044
1045 let mut log_contents = String::new();
1046
1047 drop(log_guard); wait_until(
1050 || {
1051 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1052 .expect("Failed to read directory")
1053 .filter_map(Result::ok)
1054 .find(|entry| entry.path().is_file())
1055 {
1056 let log_file_path = log_file.path();
1057 log_contents = std::fs::read_to_string(log_file_path)
1058 .expect("Error while reading log file");
1059 !log_contents.is_empty()
1060 } else {
1061 false
1062 }
1063 },
1064 Duration::from_secs(3),
1065 );
1066
1067 assert_eq!(
1068 log_contents,
1069 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1070 );
1071 }
1072
1073 #[ignore = "Flaky test: Passing locally on some systems, failing in CI"]
1074 #[rstest]
1075 fn test_file_rotation_and_backup_limits() {
1076 let temp_dir = tempdir().expect("Failed to create temporary directory");
1078 let dir_path = temp_dir.path().to_str().unwrap().to_string();
1079
1080 let max_backups = 3;
1082 let max_file_size = 100;
1083 let file_config = FileWriterConfig {
1084 directory: Some(dir_path.clone()),
1085 file_name: None,
1086 file_format: Some("log".to_string()),
1087 file_rotate: Some((max_file_size, max_backups).into()), };
1089
1090 let config = LoggerConfig::from_spec("fileout=Info;Test=Info").unwrap();
1092 let log_guard = Logger::init_with_config(
1093 TraderId::from("TRADER-001"),
1094 UUID4::new(),
1095 config,
1096 file_config,
1097 );
1098
1099 log::info!(
1100 component = "Test";
1101 "Test log message with enough content to exceed our small max file size limit"
1102 );
1103
1104 sleep(Duration::from_millis(100));
1105
1106 let files: Vec<_> = std::fs::read_dir(&dir_path)
1108 .expect("Failed to read directory")
1109 .filter_map(Result::ok)
1110 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1111 .collect();
1112
1113 assert_eq!(files.len(), 1);
1115
1116 log::info!(
1117 component = "Test";
1118 "Test log message with enough content to exceed our small max file size limit"
1119 );
1120
1121 sleep(Duration::from_millis(100));
1122
1123 let files: Vec<_> = std::fs::read_dir(&dir_path)
1125 .expect("Failed to read directory")
1126 .filter_map(Result::ok)
1127 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1128 .collect();
1129
1130 assert_eq!(files.len(), 2);
1132
1133 for _ in 0..5 {
1134 log::info!(
1136 component = "Test";
1137 "Test log message with enough content to exceed our small max file size limit"
1138 );
1139
1140 sleep(Duration::from_millis(100));
1141 }
1142
1143 let files: Vec<_> = std::fs::read_dir(&dir_path)
1145 .expect("Failed to read directory")
1146 .filter_map(Result::ok)
1147 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1148 .collect();
1149
1150 assert!(
1152 files.len() == max_backups as usize + 1,
1153 "Expected at most {} log files, found {}",
1154 max_backups,
1155 files.len()
1156 );
1157
1158 drop(log_guard);
1160 drop(temp_dir);
1161 }
1162 }
1163}