1use std::{
17 collections::HashMap,
18 env,
19 fmt::Display,
20 str::FromStr,
21 sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27 kv::{ToValue, Value},
28 set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31 UUID4, UnixNanos,
32 datetime::unix_nanos_to_iso8601,
33 time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
40use crate::{
41 enums::{LogColor, LogLevel},
42 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46const KV_COLOR: &str = "color";
47const KV_COMPONENT: &str = "component";
48
49static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
51
52static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
54
55#[cfg_attr(
56 feature = "python",
57 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
58)]
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct LoggerConfig {
61 pub stdout_level: LevelFilter,
63 pub fileout_level: LevelFilter,
65 component_level: HashMap<Ustr, LevelFilter>,
67 pub log_components_only: bool,
69 pub is_colored: bool,
71 pub print_config: bool,
73}
74
75impl Default for LoggerConfig {
76 fn default() -> Self {
78 Self {
79 stdout_level: LevelFilter::Info,
80 fileout_level: LevelFilter::Off,
81 component_level: HashMap::new(),
82 log_components_only: false,
83 is_colored: true,
84 print_config: false,
85 }
86 }
87}
88
89impl LoggerConfig {
90 #[must_use]
92 pub const fn new(
93 stdout_level: LevelFilter,
94 fileout_level: LevelFilter,
95 component_level: HashMap<Ustr, LevelFilter>,
96 log_components_only: bool,
97 is_colored: bool,
98 print_config: bool,
99 ) -> Self {
100 Self {
101 stdout_level,
102 fileout_level,
103 component_level,
104 log_components_only,
105 is_colored,
106 print_config,
107 }
108 }
109
110 pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
114 let mut config = Self::default();
115 for kv in spec.split(';') {
116 let kv = kv.trim();
117 if kv.is_empty() {
118 continue;
119 }
120 let kv_lower = kv.to_lowercase(); if kv_lower == "log_components_only" {
123 config.log_components_only = true;
124 } else if kv_lower == "is_colored" {
125 config.is_colored = true;
126 } else if kv_lower == "print_config" {
127 config.print_config = true;
128 } else {
129 let parts: Vec<&str> = kv.split('=').collect();
130 if parts.len() != 2 {
131 anyhow::bail!("Invalid spec pair: {}", kv);
132 }
133 let k = parts[0].trim(); let v = parts[1].trim(); let lvl = LevelFilter::from_str(v)
136 .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
137 let k_lower = k.to_lowercase(); match k_lower.as_str() {
139 "stdout" => config.stdout_level = lvl,
140 "fileout" => config.fileout_level = lvl,
141 _ => {
142 config.component_level.insert(Ustr::from(k), lvl);
143 }
144 }
145 }
146 }
147 Ok(config)
148 }
149
150 pub fn from_env() -> anyhow::Result<Self> {
156 let spec = env::var("NAUTILUS_LOG")?;
157 Self::from_spec(&spec)
158 }
159}
160
161#[derive(Debug)]
167pub struct Logger {
168 pub config: LoggerConfig,
170 tx: std::sync::mpsc::Sender<LogEvent>,
172}
173
174#[derive(Debug)]
176pub enum LogEvent {
177 Log(LogLine),
179 Flush,
181 Close,
183}
184
185#[derive(Clone, Debug, Serialize, Deserialize)]
187pub struct LogLine {
188 pub timestamp: UnixNanos,
190 pub level: Level,
192 pub color: LogColor,
194 pub component: Ustr,
196 pub message: String,
198}
199
200impl Display for LogLine {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
203 }
204}
205
206#[derive(Clone, Debug)]
213pub struct LogLineWrapper {
214 line: LogLine,
216 cache: Option<String>,
218 colored: Option<String>,
220 trader_id: Ustr,
222}
223
224impl LogLineWrapper {
225 #[must_use]
227 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
228 Self {
229 line,
230 cache: None,
231 colored: None,
232 trader_id,
233 }
234 }
235
236 pub fn get_string(&mut self) -> &str {
241 self.cache.get_or_insert_with(|| {
242 format!(
243 "{} [{}] {}.{}: {}\n",
244 unix_nanos_to_iso8601(self.line.timestamp),
245 self.line.level,
246 self.trader_id,
247 &self.line.component,
248 &self.line.message,
249 )
250 })
251 }
252
253 pub fn get_colored(&mut self) -> &str {
259 self.colored.get_or_insert_with(|| {
260 format!(
261 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
262 unix_nanos_to_iso8601(self.line.timestamp),
263 &self.line.color.as_ansi(),
264 self.line.level,
265 self.trader_id,
266 &self.line.component,
267 &self.line.message,
268 )
269 })
270 }
271
272 #[must_use]
281 pub fn get_json(&self) -> String {
282 let json_string =
283 serde_json::to_string(&self).expect("Error serializing log event to string");
284 format!("{json_string}\n")
285 }
286}
287
288impl Serialize for LogLineWrapper {
289 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290 where
291 S: Serializer,
292 {
293 let mut json_obj = IndexMap::new();
294 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
295 json_obj.insert("timestamp".to_string(), timestamp);
296 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
297 json_obj.insert("level".to_string(), self.line.level.to_string());
298 json_obj.insert("color".to_string(), self.line.color.to_string());
299 json_obj.insert("component".to_string(), self.line.component.to_string());
300 json_obj.insert("message".to_string(), self.line.message.to_string());
301
302 json_obj.serialize(serializer)
303 }
304}
305
306impl Log for Logger {
307 fn enabled(&self, metadata: &log::Metadata) -> bool {
308 !LOGGING_BYPASSED.load(Ordering::Relaxed)
309 && (metadata.level() == Level::Error
310 || metadata.level() <= self.config.stdout_level
311 || metadata.level() <= self.config.fileout_level)
312 }
313
314 fn log(&self, record: &log::Record) {
315 if self.enabled(record.metadata()) {
316 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
317 get_atomic_clock_realtime().get_time_ns()
318 } else {
319 get_atomic_clock_static().get_time_ns()
320 };
321 let level = record.level();
322 let key_values = record.key_values();
323 let color: LogColor = key_values
324 .get(KV_COLOR.into())
325 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
326 .unwrap_or(level.into());
327 let component = key_values.get(KV_COMPONENT.into()).map_or_else(
328 || Ustr::from(record.metadata().target()),
329 |v| Ustr::from(&v.to_string()),
330 );
331
332 let line = LogLine {
333 timestamp,
334 level,
335 color,
336 component,
337 message: format!("{}", record.args()),
338 };
339 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
340 eprintln!("Error sending log event (receiver closed): {line}");
341 }
342 }
343 }
344
345 fn flush(&self) {
346 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
348 return;
349 }
350
351 if let Err(e) = self.tx.send(LogEvent::Flush) {
352 eprintln!("Error sending flush log event: {e}");
353 }
354 }
355}
356
357#[allow(clippy::too_many_arguments)]
358impl Logger {
359 pub fn init_with_env(
365 trader_id: TraderId,
366 instance_id: UUID4,
367 file_config: FileWriterConfig,
368 ) -> anyhow::Result<LogGuard> {
369 let config = LoggerConfig::from_env()?;
370 Self::init_with_config(trader_id, instance_id, config, file_config)
371 }
372
373 pub fn init_with_config(
388 trader_id: TraderId,
389 instance_id: UUID4,
390 config: LoggerConfig,
391 file_config: FileWriterConfig,
392 ) -> anyhow::Result<LogGuard> {
393 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
394
395 let logger_tx = tx.clone();
396 let logger = Self {
397 tx: logger_tx,
398 config: config.clone(),
399 };
400
401 set_boxed_logger(Box::new(logger))?;
402
403 if LOGGER_TX.set(tx.clone()).is_err() {
405 debug_assert!(
406 false,
407 "LOGGER_TX already set - re-initialization not supported"
408 );
409 }
410
411 let print_config = config.print_config;
412 if print_config {
413 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
414 println!("Logger initialized with {config:?} {file_config:?}");
415 }
416
417 let handle = std::thread::Builder::new()
418 .name(LOGGING.to_string())
419 .spawn(move || {
420 Self::handle_messages(
421 trader_id.to_string(),
422 instance_id.to_string(),
423 config,
424 file_config,
425 rx,
426 );
427 })?;
428
429 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
431 debug_assert!(
432 handle_guard.is_none(),
433 "LOGGER_HANDLE already set - re-initialization not supported"
434 );
435 *handle_guard = Some(handle);
436 }
437
438 let max_level = log::LevelFilter::Trace;
439 set_max_level(max_level);
440
441 if print_config {
442 println!("Logger set as `log` implementation with max level {max_level}");
443 }
444
445 LogGuard::new()
446 .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
447 }
448
449 fn handle_messages(
450 trader_id: String,
451 instance_id: String,
452 config: LoggerConfig,
453 file_config: FileWriterConfig,
454 rx: std::sync::mpsc::Receiver<LogEvent>,
455 ) {
456 let LoggerConfig {
457 stdout_level,
458 fileout_level,
459 component_level,
460 log_components_only,
461 is_colored,
462 print_config: _,
463 } = config;
464
465 let trader_id_cache = Ustr::from(&trader_id);
466
467 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
469 let mut stderr_writer = StderrWriter::new(is_colored);
470
471 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
473 None
474 } else {
475 FileWriter::new(
476 trader_id.clone(),
477 instance_id.clone(),
478 file_config.clone(),
479 fileout_level,
480 )
481 };
482
483 let process_event = |event: LogEvent,
484 stdout_writer: &mut StdoutWriter,
485 stderr_writer: &mut StderrWriter,
486 file_writer_opt: &mut Option<FileWriter>| {
487 match event {
488 LogEvent::Log(line) => {
489 let component_filter_level = component_level.get(&line.component);
490
491 if log_components_only && component_filter_level.is_none() {
492 return;
493 }
494
495 if let Some(&filter_level) = component_filter_level
496 && line.level > filter_level
497 {
498 return;
499 }
500
501 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
502
503 if stderr_writer.enabled(&wrapper.line) {
504 if is_colored {
505 stderr_writer.write(wrapper.get_colored());
506 } else {
507 stderr_writer.write(wrapper.get_string());
508 }
509 }
510
511 if stdout_writer.enabled(&wrapper.line) {
512 if is_colored {
513 stdout_writer.write(wrapper.get_colored());
514 } else {
515 stdout_writer.write(wrapper.get_string());
516 }
517 }
518
519 if let Some(file_writer) = file_writer_opt
520 && file_writer.enabled(&wrapper.line)
521 {
522 if file_writer.json_format {
523 file_writer.write(&wrapper.get_json());
524 } else {
525 file_writer.write(wrapper.get_string());
526 }
527 }
528 }
529 LogEvent::Flush => {
530 stdout_writer.flush();
531 stderr_writer.flush();
532
533 if let Some(file_writer) = file_writer_opt {
534 file_writer.flush();
535 }
536 }
537 LogEvent::Close => {
538 }
540 }
541 };
542
543 while let Ok(event) = rx.recv() {
545 match event {
546 LogEvent::Log(_) | LogEvent::Flush => process_event(
547 event,
548 &mut stdout_writer,
549 &mut stderr_writer,
550 &mut file_writer_opt,
551 ),
552 LogEvent::Close => {
553 stdout_writer.flush();
555 stderr_writer.flush();
556
557 if let Some(ref mut file_writer) = file_writer_opt {
558 file_writer.flush();
559 }
560
561 while let Ok(evt) = rx.try_recv() {
564 match evt {
565 LogEvent::Close => (), _ => process_event(
567 evt,
568 &mut stdout_writer,
569 &mut stderr_writer,
570 &mut file_writer_opt,
571 ),
572 }
573 }
574
575 stdout_writer.flush();
577 stderr_writer.flush();
578
579 if let Some(ref mut file_writer) = file_writer_opt {
580 file_writer.flush();
581 }
582
583 break;
584 }
585 }
586 }
587 }
588}
589
590pub(crate) fn shutdown_graceful() {
599 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
601 log::set_max_level(log::LevelFilter::Off);
602
603 if let Some(tx) = LOGGER_TX.get() {
605 let _ = tx.send(LogEvent::Close);
606 }
607
608 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
609 && let Some(handle) = handle_guard.take()
610 && handle.thread().id() != std::thread::current().id()
611 {
612 let _ = handle.join();
613 }
614
615 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
616}
617
618pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
619 let color = Value::from(color as u8);
620
621 match level {
622 LogLevel::Off => {}
623 LogLevel::Trace => {
624 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
625 }
626 LogLevel::Debug => {
627 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
628 }
629 LogLevel::Info => {
630 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
631 }
632 LogLevel::Warning => {
633 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
634 }
635 LogLevel::Error => {
636 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
637 }
638 }
639}
640
641#[cfg_attr(
668 feature = "python",
669 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
670)]
671#[derive(Debug)]
672pub struct LogGuard {
673 tx: std::sync::mpsc::Sender<LogEvent>,
674}
675
676impl LogGuard {
677 #[must_use]
685 pub fn new() -> Option<Self> {
686 LOGGER_TX.get().map(|tx| {
687 LOGGING_GUARDS_ACTIVE
688 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
689 if count == u8::MAX {
690 None } else {
692 Some(count + 1)
693 }
694 })
695 .expect("Maximum number of active LogGuards (255) exceeded");
696
697 Self { tx: tx.clone() }
698 })
699 }
700}
701
702impl Drop for LogGuard {
703 fn drop(&mut self) {
708 let previous_count = LOGGING_GUARDS_ACTIVE
709 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
710 if count == 0 {
711 panic!("LogGuard reference count underflow");
712 }
713 Some(count - 1)
714 })
715 .expect("Failed to decrement LogGuard count");
716
717 if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
719 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
723
724 log::set_max_level(log::LevelFilter::Off);
726
727 let _ = self.tx.send(LogEvent::Close);
729
730 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
732 && let Some(handle) = handle_guard.take()
733 {
734 if handle.thread().id() != std::thread::current().id() {
736 let _ = handle.join();
737 }
738 }
739
740 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
742 } else {
743 let _ = self.tx.send(LogEvent::Flush);
745 }
746 }
747}
748
749#[cfg(test)]
753mod tests {
754 use std::{collections::HashMap, thread::sleep, time::Duration};
755
756 use log::LevelFilter;
757 use nautilus_core::UUID4;
758 use nautilus_model::identifiers::TraderId;
759 use rstest::*;
760 use serde_json::Value;
761 use tempfile::tempdir;
762 use ustr::Ustr;
763
764 use super::*;
765 use crate::{
766 enums::LogColor,
767 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
768 testing::wait_until,
769 };
770
771 #[rstest]
772 fn log_message_serialization() {
773 let log_message = LogLine {
774 timestamp: UnixNanos::default(),
775 level: log::Level::Info,
776 color: LogColor::Normal,
777 component: Ustr::from("Portfolio"),
778 message: "This is a log message".to_string(),
779 };
780
781 let serialized_json = serde_json::to_string(&log_message).unwrap();
782 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
783
784 assert_eq!(deserialized_value["level"], "INFO");
785 assert_eq!(deserialized_value["component"], "Portfolio");
786 assert_eq!(deserialized_value["message"], "This is a log message");
787 }
788
789 #[rstest]
790 fn log_config_parsing() {
791 let config =
792 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
793 .unwrap();
794 assert_eq!(
795 config,
796 LoggerConfig {
797 stdout_level: LevelFilter::Info,
798 fileout_level: LevelFilter::Debug,
799 component_level: HashMap::from_iter(vec![(
800 Ustr::from("RiskEngine"),
801 LevelFilter::Error
802 )]),
803 log_components_only: false,
804 is_colored: true,
805 print_config: false,
806 }
807 );
808 }
809
810 #[rstest]
811 fn log_config_parsing2() {
812 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
813 assert_eq!(
814 config,
815 LoggerConfig {
816 stdout_level: LevelFilter::Warn,
817 fileout_level: LevelFilter::Error,
818 component_level: HashMap::new(),
819 log_components_only: false,
820 is_colored: true,
821 print_config: true,
822 }
823 );
824 }
825
826 #[rstest]
827 fn log_config_parsing_with_log_components_only() {
828 let config =
829 LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
830 assert_eq!(
831 config,
832 LoggerConfig {
833 stdout_level: LevelFilter::Info,
834 fileout_level: LevelFilter::Off,
835 component_level: HashMap::from_iter(vec![(
836 Ustr::from("RiskEngine"),
837 LevelFilter::Debug
838 )]),
839 log_components_only: true,
840 is_colored: true,
841 print_config: false,
842 }
843 );
844 }
845
846 mod serial_tests {
848 use super::*;
849
850 #[rstest]
851 fn test_logging_to_file() {
852 let config = LoggerConfig {
853 fileout_level: LevelFilter::Debug,
854 ..Default::default()
855 };
856
857 let temp_dir = tempdir().expect("Failed to create temporary directory");
858 let file_config = FileWriterConfig {
859 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
860 ..Default::default()
861 };
862
863 let log_guard = Logger::init_with_config(
864 TraderId::from("TRADER-001"),
865 UUID4::new(),
866 config,
867 file_config,
868 );
869
870 logging_clock_set_static_mode();
871 logging_clock_set_static_time(1_650_000_000_000_000);
872
873 log::info!(
874 component = "RiskEngine";
875 "This is a test."
876 );
877
878 let mut log_contents = String::new();
879
880 wait_until(
881 || {
882 std::fs::read_dir(&temp_dir)
883 .expect("Failed to read directory")
884 .filter_map(Result::ok)
885 .any(|entry| entry.path().is_file())
886 },
887 Duration::from_secs(3),
888 );
889
890 drop(log_guard); wait_until(
893 || {
894 let log_file_path = std::fs::read_dir(&temp_dir)
895 .expect("Failed to read directory")
896 .filter_map(Result::ok)
897 .find(|entry| entry.path().is_file())
898 .expect("No files found in directory")
899 .path();
900 dbg!(&log_file_path);
901 log_contents = std::fs::read_to_string(log_file_path)
902 .expect("Error while reading log file");
903 !log_contents.is_empty()
904 },
905 Duration::from_secs(3),
906 );
907
908 assert_eq!(
909 log_contents,
910 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
911 );
912 }
913
914 #[rstest]
915 fn test_shutdown_drains_backlog_tail() {
916 let config = LoggerConfig {
918 stdout_level: LevelFilter::Off,
919 fileout_level: LevelFilter::Info,
920 ..Default::default()
921 };
922
923 let temp_dir = tempdir().expect("Failed to create temporary directory");
924 let file_config = FileWriterConfig {
925 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
926 ..Default::default()
927 };
928
929 let log_guard = Logger::init_with_config(
930 TraderId::from("TRADER-TAIL"),
931 UUID4::new(),
932 config,
933 file_config,
934 )
935 .expect("Failed to initialize logger");
936
937 logging_clock_set_static_mode();
939 logging_clock_set_static_time(1_700_000_000_000_000);
940
941 const N: usize = 1000;
943 for i in 0..N {
944 log::info!(component = "TailDrain"; "BacklogTest {i}");
945 }
946
947 drop(log_guard);
949
950 let mut count = 0usize;
952 wait_until(
953 || {
954 if let Some(log_file) = std::fs::read_dir(&temp_dir)
955 .expect("Failed to read directory")
956 .filter_map(Result::ok)
957 .find(|entry| entry.path().is_file())
958 {
959 let log_file_path = log_file.path();
960 if let Ok(contents) = std::fs::read_to_string(log_file_path) {
961 count = contents
962 .lines()
963 .filter(|l| l.contains("BacklogTest "))
964 .count();
965 count >= N
966 } else {
967 false
968 }
969 } else {
970 false
971 }
972 },
973 Duration::from_secs(5),
974 );
975
976 assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
977 }
978
979 #[rstest]
980 fn test_log_component_level_filtering() {
981 let config =
982 LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
983
984 let temp_dir = tempdir().expect("Failed to create temporary directory");
985 let file_config = FileWriterConfig {
986 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
987 ..Default::default()
988 };
989
990 let log_guard = Logger::init_with_config(
991 TraderId::from("TRADER-001"),
992 UUID4::new(),
993 config,
994 file_config,
995 );
996
997 logging_clock_set_static_mode();
998 logging_clock_set_static_time(1_650_000_000_000_000);
999
1000 log::info!(
1001 component = "RiskEngine";
1002 "This is a test."
1003 );
1004
1005 drop(log_guard); wait_until(
1008 || {
1009 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1010 .expect("Failed to read directory")
1011 .filter_map(Result::ok)
1012 .find(|entry| entry.path().is_file())
1013 {
1014 let log_file_path = log_file.path();
1015 let log_contents = std::fs::read_to_string(log_file_path)
1016 .expect("Error while reading log file");
1017 !log_contents.contains("RiskEngine")
1018 } else {
1019 false
1020 }
1021 },
1022 Duration::from_secs(3),
1023 );
1024
1025 assert!(
1026 std::fs::read_dir(&temp_dir)
1027 .expect("Failed to read directory")
1028 .filter_map(Result::ok)
1029 .any(|entry| entry.path().is_file()),
1030 "Log file exists"
1031 );
1032 }
1033
1034 #[rstest]
1035 fn test_logging_to_file_in_json_format() {
1036 let config =
1037 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1038 .unwrap();
1039
1040 let temp_dir = tempdir().expect("Failed to create temporary directory");
1041 let file_config = FileWriterConfig {
1042 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1043 file_format: Some("json".to_string()),
1044 ..Default::default()
1045 };
1046
1047 let log_guard = Logger::init_with_config(
1048 TraderId::from("TRADER-001"),
1049 UUID4::new(),
1050 config,
1051 file_config,
1052 );
1053
1054 logging_clock_set_static_mode();
1055 logging_clock_set_static_time(1_650_000_000_000_000);
1056
1057 log::info!(
1058 component = "RiskEngine";
1059 "This is a test."
1060 );
1061
1062 let mut log_contents = String::new();
1063
1064 drop(log_guard); wait_until(
1067 || {
1068 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1069 .expect("Failed to read directory")
1070 .filter_map(Result::ok)
1071 .find(|entry| entry.path().is_file())
1072 {
1073 let log_file_path = log_file.path();
1074 log_contents = std::fs::read_to_string(log_file_path)
1075 .expect("Error while reading log file");
1076 !log_contents.is_empty()
1077 } else {
1078 false
1079 }
1080 },
1081 Duration::from_secs(3),
1082 );
1083
1084 assert_eq!(
1085 log_contents,
1086 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1087 );
1088 }
1089
1090 #[ignore = "Flaky test: Passing locally on some systems, failing in CI"]
1091 #[rstest]
1092 fn test_file_rotation_and_backup_limits() {
1093 let temp_dir = tempdir().expect("Failed to create temporary directory");
1095 let dir_path = temp_dir.path().to_str().unwrap().to_string();
1096
1097 let max_backups = 3;
1099 let max_file_size = 100;
1100 let file_config = FileWriterConfig {
1101 directory: Some(dir_path.clone()),
1102 file_name: None,
1103 file_format: Some("log".to_string()),
1104 file_rotate: Some((max_file_size, max_backups).into()), };
1106
1107 let config = LoggerConfig::from_spec("fileout=Info;Test=Info").unwrap();
1109 let log_guard = Logger::init_with_config(
1110 TraderId::from("TRADER-001"),
1111 UUID4::new(),
1112 config,
1113 file_config,
1114 );
1115
1116 log::info!(
1117 component = "Test";
1118 "Test log message with enough content to exceed our small max file size limit"
1119 );
1120
1121 sleep(Duration::from_millis(100));
1122
1123 let files: Vec<_> = std::fs::read_dir(&dir_path)
1125 .expect("Failed to read directory")
1126 .filter_map(Result::ok)
1127 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1128 .collect();
1129
1130 assert_eq!(files.len(), 1);
1132
1133 log::info!(
1134 component = "Test";
1135 "Test log message with enough content to exceed our small max file size limit"
1136 );
1137
1138 sleep(Duration::from_millis(100));
1139
1140 let files: Vec<_> = std::fs::read_dir(&dir_path)
1142 .expect("Failed to read directory")
1143 .filter_map(Result::ok)
1144 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1145 .collect();
1146
1147 assert_eq!(files.len(), 2);
1149
1150 for _ in 0..5 {
1151 log::info!(
1153 component = "Test";
1154 "Test log message with enough content to exceed our small max file size limit"
1155 );
1156
1157 sleep(Duration::from_millis(100));
1158 }
1159
1160 let files: Vec<_> = std::fs::read_dir(&dir_path)
1162 .expect("Failed to read directory")
1163 .filter_map(Result::ok)
1164 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1165 .collect();
1166
1167 assert!(
1169 files.len() == max_backups as usize + 1,
1170 "Expected at most {} log files, found {}",
1171 max_backups,
1172 files.len()
1173 );
1174
1175 drop(log_guard);
1177 drop(temp_dir);
1178 }
1179 }
1180}