1use std::{
17 collections::HashMap,
18 env,
19 fmt::Display,
20 str::FromStr,
21 sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27 kv::{ToValue, Value},
28 set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31 UUID4, UnixNanos,
32 datetime::unix_nanos_to_iso8601,
33 time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
40use crate::{
41 enums::{LogColor, LogLevel},
42 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46const KV_COLOR: &str = "color";
47const KV_COMPONENT: &str = "component";
48
49static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
51
52static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
54
55#[cfg_attr(
56 feature = "python",
57 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
58)]
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct LoggerConfig {
61 pub stdout_level: LevelFilter,
63 pub fileout_level: LevelFilter,
65 component_level: HashMap<Ustr, LevelFilter>,
67 pub log_components_only: bool,
69 pub is_colored: bool,
71 pub print_config: bool,
73}
74
75impl Default for LoggerConfig {
76 fn default() -> Self {
78 Self {
79 stdout_level: LevelFilter::Info,
80 fileout_level: LevelFilter::Off,
81 component_level: HashMap::new(),
82 log_components_only: false,
83 is_colored: true,
84 print_config: false,
85 }
86 }
87}
88
89impl LoggerConfig {
90 #[must_use]
92 pub const fn new(
93 stdout_level: LevelFilter,
94 fileout_level: LevelFilter,
95 component_level: HashMap<Ustr, LevelFilter>,
96 log_components_only: bool,
97 is_colored: bool,
98 print_config: bool,
99 ) -> Self {
100 Self {
101 stdout_level,
102 fileout_level,
103 component_level,
104 log_components_only,
105 is_colored,
106 print_config,
107 }
108 }
109
110 pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
114 let mut config = Self::default();
115 for kv in spec.split(';') {
116 let kv = kv.trim();
117 if kv.is_empty() {
118 continue;
119 }
120 let kv_lower = kv.to_lowercase(); if kv_lower == "log_components_only" {
123 config.log_components_only = true;
124 } else if kv_lower == "is_colored" {
125 config.is_colored = true;
126 } else if kv_lower == "print_config" {
127 config.print_config = true;
128 } else {
129 let parts: Vec<&str> = kv.split('=').collect();
130 if parts.len() != 2 {
131 anyhow::bail!("Invalid spec pair: {}", kv);
132 }
133 let k = parts[0].trim(); let v = parts[1].trim(); let lvl = LevelFilter::from_str(v)
136 .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
137 let k_lower = k.to_lowercase(); match k_lower.as_str() {
139 "stdout" => config.stdout_level = lvl,
140 "fileout" => config.fileout_level = lvl,
141 _ => {
142 config.component_level.insert(Ustr::from(k), lvl);
143 }
144 }
145 }
146 }
147 Ok(config)
148 }
149
150 pub fn from_env() -> anyhow::Result<Self> {
156 let spec = env::var("NAUTILUS_LOG")?;
157 Self::from_spec(&spec)
158 }
159}
160
161#[derive(Debug)]
167pub struct Logger {
168 pub config: LoggerConfig,
170 tx: std::sync::mpsc::Sender<LogEvent>,
172}
173
174#[derive(Debug)]
176pub enum LogEvent {
177 Log(LogLine),
179 Flush,
181 Close,
183}
184
185#[derive(Clone, Debug, Serialize, Deserialize)]
187pub struct LogLine {
188 pub timestamp: UnixNanos,
190 pub level: Level,
192 pub color: LogColor,
194 pub component: Ustr,
196 pub message: String,
198}
199
200impl Display for LogLine {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
203 }
204}
205
206#[derive(Clone, Debug)]
213pub struct LogLineWrapper {
214 line: LogLine,
216 cache: Option<String>,
218 colored: Option<String>,
220 trader_id: Ustr,
222}
223
224impl LogLineWrapper {
225 #[must_use]
227 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
228 Self {
229 line,
230 cache: None,
231 colored: None,
232 trader_id,
233 }
234 }
235
236 pub fn get_string(&mut self) -> &str {
241 self.cache.get_or_insert_with(|| {
242 format!(
243 "{} [{}] {}.{}: {}\n",
244 unix_nanos_to_iso8601(self.line.timestamp),
245 self.line.level,
246 self.trader_id,
247 &self.line.component,
248 &self.line.message,
249 )
250 })
251 }
252
253 pub fn get_colored(&mut self) -> &str {
259 self.colored.get_or_insert_with(|| {
260 format!(
261 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
262 unix_nanos_to_iso8601(self.line.timestamp),
263 &self.line.color.as_ansi(),
264 self.line.level,
265 self.trader_id,
266 &self.line.component,
267 &self.line.message,
268 )
269 })
270 }
271
272 #[must_use]
281 pub fn get_json(&self) -> String {
282 let json_string =
283 serde_json::to_string(&self).expect("Error serializing log event to string");
284 format!("{json_string}\n")
285 }
286}
287
288impl Serialize for LogLineWrapper {
289 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290 where
291 S: Serializer,
292 {
293 let mut json_obj = IndexMap::new();
294 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
295 json_obj.insert("timestamp".to_string(), timestamp);
296 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
297 json_obj.insert("level".to_string(), self.line.level.to_string());
298 json_obj.insert("color".to_string(), self.line.color.to_string());
299 json_obj.insert("component".to_string(), self.line.component.to_string());
300 json_obj.insert("message".to_string(), self.line.message.clone());
301
302 json_obj.serialize(serializer)
303 }
304}
305
306impl Log for Logger {
307 fn enabled(&self, metadata: &log::Metadata) -> bool {
308 !LOGGING_BYPASSED.load(Ordering::Relaxed)
309 && (metadata.level() == Level::Error
310 || metadata.level() <= self.config.stdout_level
311 || metadata.level() <= self.config.fileout_level)
312 }
313
314 fn log(&self, record: &log::Record) {
315 if self.enabled(record.metadata()) {
316 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
317 get_atomic_clock_realtime().get_time_ns()
318 } else {
319 get_atomic_clock_static().get_time_ns()
320 };
321 let level = record.level();
322 let key_values = record.key_values();
323 let color: LogColor = key_values
324 .get(KV_COLOR.into())
325 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
326 .unwrap_or(level.into());
327 let component = key_values.get(KV_COMPONENT.into()).map_or_else(
328 || Ustr::from(record.metadata().target()),
329 |v| Ustr::from(&v.to_string()),
330 );
331
332 let line = LogLine {
333 timestamp,
334 level,
335 color,
336 component,
337 message: format!("{}", record.args()),
338 };
339 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
340 eprintln!("Error sending log event (receiver closed): {line}");
341 }
342 }
343 }
344
345 fn flush(&self) {
346 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
348 return;
349 }
350
351 if let Err(e) = self.tx.send(LogEvent::Flush) {
352 eprintln!("Error sending flush log event: {e}");
353 }
354 }
355}
356
357#[allow(clippy::too_many_arguments)]
358impl Logger {
359 pub fn init_with_env(
365 trader_id: TraderId,
366 instance_id: UUID4,
367 file_config: FileWriterConfig,
368 ) -> anyhow::Result<LogGuard> {
369 let config = LoggerConfig::from_env()?;
370 Self::init_with_config(trader_id, instance_id, config, file_config)
371 }
372
373 pub fn init_with_config(
388 trader_id: TraderId,
389 instance_id: UUID4,
390 config: LoggerConfig,
391 file_config: FileWriterConfig,
392 ) -> anyhow::Result<LogGuard> {
393 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
394
395 let logger_tx = tx.clone();
396 let logger = Self {
397 tx: logger_tx,
398 config: config.clone(),
399 };
400
401 set_boxed_logger(Box::new(logger))?;
402
403 if LOGGER_TX.set(tx).is_err() {
405 debug_assert!(
406 false,
407 "LOGGER_TX already set - re-initialization not supported"
408 );
409 }
410
411 let print_config = config.print_config;
412 if print_config {
413 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
414 println!("Logger initialized with {config:?} {file_config:?}");
415 }
416
417 let handle = std::thread::Builder::new()
418 .name(LOGGING.to_string())
419 .spawn(move || {
420 Self::handle_messages(
421 trader_id.to_string(),
422 instance_id.to_string(),
423 config,
424 file_config,
425 rx,
426 );
427 })?;
428
429 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
431 debug_assert!(
432 handle_guard.is_none(),
433 "LOGGER_HANDLE already set - re-initialization not supported"
434 );
435 *handle_guard = Some(handle);
436 }
437
438 let max_level = log::LevelFilter::Trace;
439 set_max_level(max_level);
440
441 if print_config {
442 println!("Logger set as `log` implementation with max level {max_level}");
443 }
444
445 LogGuard::new()
446 .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
447 }
448
449 fn handle_messages(
450 trader_id: String,
451 instance_id: String,
452 config: LoggerConfig,
453 file_config: FileWriterConfig,
454 rx: std::sync::mpsc::Receiver<LogEvent>,
455 ) {
456 let LoggerConfig {
457 stdout_level,
458 fileout_level,
459 component_level,
460 log_components_only,
461 is_colored,
462 print_config: _,
463 } = config;
464
465 let trader_id_cache = Ustr::from(&trader_id);
466
467 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
469 let mut stderr_writer = StderrWriter::new(is_colored);
470
471 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
473 None
474 } else {
475 FileWriter::new(trader_id, instance_id, file_config, fileout_level)
476 };
477
478 let process_event = |event: LogEvent,
479 stdout_writer: &mut StdoutWriter,
480 stderr_writer: &mut StderrWriter,
481 file_writer_opt: &mut Option<FileWriter>| {
482 match event {
483 LogEvent::Log(line) => {
484 let component_filter_level = component_level.get(&line.component);
485
486 if log_components_only && component_filter_level.is_none() {
487 return;
488 }
489
490 if let Some(&filter_level) = component_filter_level
491 && line.level > filter_level
492 {
493 return;
494 }
495
496 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
497
498 if stderr_writer.enabled(&wrapper.line) {
499 if is_colored {
500 stderr_writer.write(wrapper.get_colored());
501 } else {
502 stderr_writer.write(wrapper.get_string());
503 }
504 }
505
506 if stdout_writer.enabled(&wrapper.line) {
507 if is_colored {
508 stdout_writer.write(wrapper.get_colored());
509 } else {
510 stdout_writer.write(wrapper.get_string());
511 }
512 }
513
514 if let Some(file_writer) = file_writer_opt
515 && file_writer.enabled(&wrapper.line)
516 {
517 if file_writer.json_format {
518 file_writer.write(&wrapper.get_json());
519 } else {
520 file_writer.write(wrapper.get_string());
521 }
522 }
523 }
524 LogEvent::Flush => {
525 stdout_writer.flush();
526 stderr_writer.flush();
527
528 if let Some(file_writer) = file_writer_opt {
529 file_writer.flush();
530 }
531 }
532 LogEvent::Close => {
533 }
535 }
536 };
537
538 while let Ok(event) = rx.recv() {
540 match event {
541 LogEvent::Log(_) | LogEvent::Flush => process_event(
542 event,
543 &mut stdout_writer,
544 &mut stderr_writer,
545 &mut file_writer_opt,
546 ),
547 LogEvent::Close => {
548 stdout_writer.flush();
550 stderr_writer.flush();
551
552 if let Some(ref mut file_writer) = file_writer_opt {
553 file_writer.flush();
554 }
555
556 while let Ok(evt) = rx.try_recv() {
559 match evt {
560 LogEvent::Close => (), _ => process_event(
562 evt,
563 &mut stdout_writer,
564 &mut stderr_writer,
565 &mut file_writer_opt,
566 ),
567 }
568 }
569
570 stdout_writer.flush();
572 stderr_writer.flush();
573
574 if let Some(ref mut file_writer) = file_writer_opt {
575 file_writer.flush();
576 }
577
578 break;
579 }
580 }
581 }
582 }
583}
584
585pub(crate) fn shutdown_graceful() {
594 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
596 log::set_max_level(log::LevelFilter::Off);
597
598 if let Some(tx) = LOGGER_TX.get() {
600 let _ = tx.send(LogEvent::Close);
601 }
602
603 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
604 && let Some(handle) = handle_guard.take()
605 && handle.thread().id() != std::thread::current().id()
606 {
607 let _ = handle.join();
608 }
609
610 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
611}
612
613pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
614 let color = Value::from(color as u8);
615
616 match level {
617 LogLevel::Off => {}
618 LogLevel::Trace => {
619 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
620 }
621 LogLevel::Debug => {
622 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
623 }
624 LogLevel::Info => {
625 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
626 }
627 LogLevel::Warning => {
628 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
629 }
630 LogLevel::Error => {
631 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
632 }
633 }
634}
635
636#[cfg_attr(
663 feature = "python",
664 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
665)]
666#[derive(Debug)]
667pub struct LogGuard {
668 tx: std::sync::mpsc::Sender<LogEvent>,
669}
670
671impl LogGuard {
672 #[must_use]
680 pub fn new() -> Option<Self> {
681 LOGGER_TX.get().map(|tx| {
682 LOGGING_GUARDS_ACTIVE
683 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
684 if count == u8::MAX {
685 None } else {
687 Some(count + 1)
688 }
689 })
690 .expect("Maximum number of active LogGuards (255) exceeded");
691
692 Self { tx: tx.clone() }
693 })
694 }
695}
696
697impl Drop for LogGuard {
698 fn drop(&mut self) {
703 let previous_count = LOGGING_GUARDS_ACTIVE
704 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
705 if count == 0 {
706 panic!("LogGuard reference count underflow");
707 }
708 Some(count - 1)
709 })
710 .expect("Failed to decrement LogGuard count");
711
712 if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
714 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
718
719 log::set_max_level(log::LevelFilter::Off);
721
722 let _ = self.tx.send(LogEvent::Close);
724
725 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
727 && let Some(handle) = handle_guard.take()
728 {
729 if handle.thread().id() != std::thread::current().id() {
731 let _ = handle.join();
732 }
733 }
734
735 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
737 } else {
738 let _ = self.tx.send(LogEvent::Flush);
740 }
741 }
742}
743
744#[cfg(test)]
748mod tests {
749 use std::{collections::HashMap, thread::sleep, time::Duration};
750
751 use log::LevelFilter;
752 use nautilus_core::UUID4;
753 use nautilus_model::identifiers::TraderId;
754 use rstest::*;
755 use serde_json::Value;
756 use tempfile::tempdir;
757 use ustr::Ustr;
758
759 use super::*;
760 use crate::{
761 enums::LogColor,
762 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
763 testing::wait_until,
764 };
765
766 #[rstest]
767 fn log_message_serialization() {
768 let log_message = LogLine {
769 timestamp: UnixNanos::default(),
770 level: log::Level::Info,
771 color: LogColor::Normal,
772 component: Ustr::from("Portfolio"),
773 message: "This is a log message".to_string(),
774 };
775
776 let serialized_json = serde_json::to_string(&log_message).unwrap();
777 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
778
779 assert_eq!(deserialized_value["level"], "INFO");
780 assert_eq!(deserialized_value["component"], "Portfolio");
781 assert_eq!(deserialized_value["message"], "This is a log message");
782 }
783
784 #[rstest]
785 fn log_config_parsing() {
786 let config =
787 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
788 .unwrap();
789 assert_eq!(
790 config,
791 LoggerConfig {
792 stdout_level: LevelFilter::Info,
793 fileout_level: LevelFilter::Debug,
794 component_level: HashMap::from_iter(vec![(
795 Ustr::from("RiskEngine"),
796 LevelFilter::Error
797 )]),
798 log_components_only: false,
799 is_colored: true,
800 print_config: false,
801 }
802 );
803 }
804
805 #[rstest]
806 fn log_config_parsing2() {
807 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
808 assert_eq!(
809 config,
810 LoggerConfig {
811 stdout_level: LevelFilter::Warn,
812 fileout_level: LevelFilter::Error,
813 component_level: HashMap::new(),
814 log_components_only: false,
815 is_colored: true,
816 print_config: true,
817 }
818 );
819 }
820
821 #[rstest]
822 fn log_config_parsing_with_log_components_only() {
823 let config =
824 LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
825 assert_eq!(
826 config,
827 LoggerConfig {
828 stdout_level: LevelFilter::Info,
829 fileout_level: LevelFilter::Off,
830 component_level: HashMap::from_iter(vec![(
831 Ustr::from("RiskEngine"),
832 LevelFilter::Debug
833 )]),
834 log_components_only: true,
835 is_colored: true,
836 print_config: false,
837 }
838 );
839 }
840
841 mod serial_tests {
843 use super::*;
844
845 #[rstest]
846 fn test_logging_to_file() {
847 let config = LoggerConfig {
848 fileout_level: LevelFilter::Debug,
849 ..Default::default()
850 };
851
852 let temp_dir = tempdir().expect("Failed to create temporary directory");
853 let file_config = FileWriterConfig {
854 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
855 ..Default::default()
856 };
857
858 let log_guard = Logger::init_with_config(
859 TraderId::from("TRADER-001"),
860 UUID4::new(),
861 config,
862 file_config,
863 );
864
865 logging_clock_set_static_mode();
866 logging_clock_set_static_time(1_650_000_000_000_000);
867
868 log::info!(
869 component = "RiskEngine";
870 "This is a test."
871 );
872
873 let mut log_contents = String::new();
874
875 wait_until(
876 || {
877 std::fs::read_dir(&temp_dir)
878 .expect("Failed to read directory")
879 .filter_map(Result::ok)
880 .any(|entry| entry.path().is_file())
881 },
882 Duration::from_secs(3),
883 );
884
885 drop(log_guard); wait_until(
888 || {
889 let log_file_path = std::fs::read_dir(&temp_dir)
890 .expect("Failed to read directory")
891 .filter_map(Result::ok)
892 .find(|entry| entry.path().is_file())
893 .expect("No files found in directory")
894 .path();
895 log_contents = std::fs::read_to_string(log_file_path)
896 .expect("Error while reading log file");
897 !log_contents.is_empty()
898 },
899 Duration::from_secs(3),
900 );
901
902 assert_eq!(
903 log_contents,
904 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
905 );
906 }
907
908 #[rstest]
909 fn test_shutdown_drains_backlog_tail() {
910 let config = LoggerConfig {
912 stdout_level: LevelFilter::Off,
913 fileout_level: LevelFilter::Info,
914 ..Default::default()
915 };
916
917 let temp_dir = tempdir().expect("Failed to create temporary directory");
918 let file_config = FileWriterConfig {
919 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
920 ..Default::default()
921 };
922
923 let log_guard = Logger::init_with_config(
924 TraderId::from("TRADER-TAIL"),
925 UUID4::new(),
926 config,
927 file_config,
928 )
929 .expect("Failed to initialize logger");
930
931 logging_clock_set_static_mode();
933 logging_clock_set_static_time(1_700_000_000_000_000);
934
935 const N: usize = 1000;
937 for i in 0..N {
938 log::info!(component = "TailDrain"; "BacklogTest {i}");
939 }
940
941 drop(log_guard);
943
944 let mut count = 0usize;
946 wait_until(
947 || {
948 if let Some(log_file) = std::fs::read_dir(&temp_dir)
949 .expect("Failed to read directory")
950 .filter_map(Result::ok)
951 .find(|entry| entry.path().is_file())
952 {
953 let log_file_path = log_file.path();
954 if let Ok(contents) = std::fs::read_to_string(log_file_path) {
955 count = contents
956 .lines()
957 .filter(|l| l.contains("BacklogTest "))
958 .count();
959 count >= N
960 } else {
961 false
962 }
963 } else {
964 false
965 }
966 },
967 Duration::from_secs(5),
968 );
969
970 assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
971 }
972
973 #[rstest]
974 fn test_log_component_level_filtering() {
975 let config =
976 LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
977
978 let temp_dir = tempdir().expect("Failed to create temporary directory");
979 let file_config = FileWriterConfig {
980 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
981 ..Default::default()
982 };
983
984 let log_guard = Logger::init_with_config(
985 TraderId::from("TRADER-001"),
986 UUID4::new(),
987 config,
988 file_config,
989 );
990
991 logging_clock_set_static_mode();
992 logging_clock_set_static_time(1_650_000_000_000_000);
993
994 log::info!(
995 component = "RiskEngine";
996 "This is a test."
997 );
998
999 drop(log_guard); wait_until(
1002 || {
1003 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1004 .expect("Failed to read directory")
1005 .filter_map(Result::ok)
1006 .find(|entry| entry.path().is_file())
1007 {
1008 let log_file_path = log_file.path();
1009 let log_contents = std::fs::read_to_string(log_file_path)
1010 .expect("Error while reading log file");
1011 !log_contents.contains("RiskEngine")
1012 } else {
1013 false
1014 }
1015 },
1016 Duration::from_secs(3),
1017 );
1018
1019 assert!(
1020 std::fs::read_dir(&temp_dir)
1021 .expect("Failed to read directory")
1022 .filter_map(Result::ok)
1023 .any(|entry| entry.path().is_file()),
1024 "Log file exists"
1025 );
1026 }
1027
1028 #[rstest]
1029 fn test_logging_to_file_in_json_format() {
1030 let config =
1031 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1032 .unwrap();
1033
1034 let temp_dir = tempdir().expect("Failed to create temporary directory");
1035 let file_config = FileWriterConfig {
1036 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1037 file_format: Some("json".to_string()),
1038 ..Default::default()
1039 };
1040
1041 let log_guard = Logger::init_with_config(
1042 TraderId::from("TRADER-001"),
1043 UUID4::new(),
1044 config,
1045 file_config,
1046 );
1047
1048 logging_clock_set_static_mode();
1049 logging_clock_set_static_time(1_650_000_000_000_000);
1050
1051 log::info!(
1052 component = "RiskEngine";
1053 "This is a test."
1054 );
1055
1056 let mut log_contents = String::new();
1057
1058 drop(log_guard); wait_until(
1061 || {
1062 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1063 .expect("Failed to read directory")
1064 .filter_map(Result::ok)
1065 .find(|entry| entry.path().is_file())
1066 {
1067 let log_file_path = log_file.path();
1068 log_contents = std::fs::read_to_string(log_file_path)
1069 .expect("Error while reading log file");
1070 !log_contents.is_empty()
1071 } else {
1072 false
1073 }
1074 },
1075 Duration::from_secs(3),
1076 );
1077
1078 assert_eq!(
1079 log_contents,
1080 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1081 );
1082 }
1083
1084 #[ignore = "Flaky test: Passing locally on some systems, failing in CI"]
1085 #[rstest]
1086 fn test_file_rotation_and_backup_limits() {
1087 let temp_dir = tempdir().expect("Failed to create temporary directory");
1089 let dir_path = temp_dir.path().to_str().unwrap().to_string();
1090
1091 let max_backups = 3;
1093 let max_file_size = 100;
1094 let file_config = FileWriterConfig {
1095 directory: Some(dir_path.clone()),
1096 file_name: None,
1097 file_format: Some("log".to_string()),
1098 file_rotate: Some((max_file_size, max_backups).into()), };
1100
1101 let config = LoggerConfig::from_spec("fileout=Info;Test=Info").unwrap();
1103 let log_guard = Logger::init_with_config(
1104 TraderId::from("TRADER-001"),
1105 UUID4::new(),
1106 config,
1107 file_config,
1108 );
1109
1110 log::info!(
1111 component = "Test";
1112 "Test log message with enough content to exceed our small max file size limit"
1113 );
1114
1115 sleep(Duration::from_millis(100));
1116
1117 let files: Vec<_> = std::fs::read_dir(&dir_path)
1119 .expect("Failed to read directory")
1120 .filter_map(Result::ok)
1121 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1122 .collect();
1123
1124 assert_eq!(files.len(), 1);
1126
1127 log::info!(
1128 component = "Test";
1129 "Test log message with enough content to exceed our small max file size limit"
1130 );
1131
1132 sleep(Duration::from_millis(100));
1133
1134 let files: Vec<_> = std::fs::read_dir(&dir_path)
1136 .expect("Failed to read directory")
1137 .filter_map(Result::ok)
1138 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1139 .collect();
1140
1141 assert_eq!(files.len(), 2);
1143
1144 for _ in 0..5 {
1145 log::info!(
1147 component = "Test";
1148 "Test log message with enough content to exceed our small max file size limit"
1149 );
1150
1151 sleep(Duration::from_millis(100));
1152 }
1153
1154 let files: Vec<_> = std::fs::read_dir(&dir_path)
1156 .expect("Failed to read directory")
1157 .filter_map(Result::ok)
1158 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
1159 .collect();
1160
1161 assert!(
1163 files.len() == max_backups as usize + 1,
1164 "Expected at most {} log files, found {}",
1165 max_backups,
1166 files.len()
1167 );
1168
1169 drop(log_guard);
1171 drop(temp_dir);
1172 }
1173 }
1174}