1use std::{
17 fmt::Display,
18 sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
19};
20
21use indexmap::IndexMap;
22use log::{
23 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
24 kv::{ToValue, Value},
25 set_boxed_logger, set_max_level,
26};
27use nautilus_core::{
28 UUID4, UnixNanos,
29 datetime::unix_nanos_to_iso8601,
30 time::{get_atomic_clock_realtime, get_atomic_clock_static},
31};
32use nautilus_model::identifiers::TraderId;
33use serde::{Deserialize, Serialize, Serializer};
34use ustr::Ustr;
35
36pub use super::config::LoggerConfig;
37use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
38use crate::{
39 enums::{LogColor, LogLevel},
40 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
41};
42
43const LOGGING: &str = "logging";
44const KV_COLOR: &str = "color";
45const KV_COMPONENT: &str = "component";
46
47static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
49
50static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
52
53#[derive(Debug)]
59pub struct Logger {
60 pub config: LoggerConfig,
62 tx: std::sync::mpsc::Sender<LogEvent>,
64}
65
66#[derive(Debug)]
68pub enum LogEvent {
69 Log(LogLine),
71 Flush,
73 Close,
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize)]
79pub struct LogLine {
80 pub timestamp: UnixNanos,
82 pub level: Level,
84 pub color: LogColor,
86 pub component: Ustr,
88 pub message: String,
90}
91
92impl Display for LogLine {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
95 }
96}
97
98#[derive(Clone, Debug)]
105pub struct LogLineWrapper {
106 line: LogLine,
108 cache: Option<String>,
110 colored: Option<String>,
112 trader_id: Ustr,
114}
115
116impl LogLineWrapper {
117 #[must_use]
119 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
120 Self {
121 line,
122 cache: None,
123 colored: None,
124 trader_id,
125 }
126 }
127
128 pub fn get_string(&mut self) -> &str {
133 self.cache.get_or_insert_with(|| {
134 format!(
135 "{} [{}] {}.{}: {}\n",
136 unix_nanos_to_iso8601(self.line.timestamp),
137 self.line.level,
138 self.trader_id,
139 &self.line.component,
140 &self.line.message,
141 )
142 })
143 }
144
145 pub fn get_colored(&mut self) -> &str {
151 self.colored.get_or_insert_with(|| {
152 format!(
153 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
154 unix_nanos_to_iso8601(self.line.timestamp),
155 &self.line.color.as_ansi(),
156 self.line.level,
157 self.trader_id,
158 &self.line.component,
159 &self.line.message,
160 )
161 })
162 }
163
164 #[must_use]
173 pub fn get_json(&self) -> String {
174 let json_string =
175 serde_json::to_string(&self).expect("Error serializing log event to string");
176 format!("{json_string}\n")
177 }
178}
179
180impl Serialize for LogLineWrapper {
181 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182 where
183 S: Serializer,
184 {
185 let mut json_obj = IndexMap::new();
186 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
187 json_obj.insert("timestamp".to_string(), timestamp);
188 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
189 json_obj.insert("level".to_string(), self.line.level.to_string());
190 json_obj.insert("color".to_string(), self.line.color.to_string());
191 json_obj.insert("component".to_string(), self.line.component.to_string());
192 json_obj.insert("message".to_string(), self.line.message.clone());
193
194 json_obj.serialize(serializer)
195 }
196}
197
198impl Log for Logger {
199 fn enabled(&self, metadata: &log::Metadata) -> bool {
200 !LOGGING_BYPASSED.load(Ordering::Relaxed)
201 && (metadata.level() == Level::Error
202 || metadata.level() <= self.config.stdout_level
203 || metadata.level() <= self.config.fileout_level)
204 }
205
206 fn log(&self, record: &log::Record) {
207 if self.enabled(record.metadata()) {
208 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
209 get_atomic_clock_realtime().get_time_ns()
210 } else {
211 get_atomic_clock_static().get_time_ns()
212 };
213 let level = record.level();
214 let key_values = record.key_values();
215 let color: LogColor = key_values
216 .get(KV_COLOR.into())
217 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
218 .unwrap_or(level.into());
219 let component = key_values.get(KV_COMPONENT.into()).map_or_else(
220 || Ustr::from(record.metadata().target()),
221 |v| Ustr::from(&v.to_string()),
222 );
223
224 let line = LogLine {
225 timestamp,
226 level,
227 color,
228 component,
229 message: format!("{}", record.args()),
230 };
231 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
232 eprintln!("Error sending log event (receiver closed): {line}");
233 }
234 }
235 }
236
237 fn flush(&self) {
238 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
240 return;
241 }
242
243 if let Err(e) = self.tx.send(LogEvent::Flush) {
244 eprintln!("Error sending flush log event: {e}");
245 }
246 }
247}
248
249#[allow(clippy::too_many_arguments)]
250impl Logger {
251 pub fn init_with_env(
257 trader_id: TraderId,
258 instance_id: UUID4,
259 file_config: FileWriterConfig,
260 ) -> anyhow::Result<LogGuard> {
261 let config = LoggerConfig::from_env()?;
262 Self::init_with_config(trader_id, instance_id, config, file_config)
263 }
264
265 pub fn init_with_config(
271 trader_id: TraderId,
272 instance_id: UUID4,
273 config: LoggerConfig,
274 file_config: FileWriterConfig,
275 ) -> anyhow::Result<LogGuard> {
276 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
277
278 let logger_tx = tx.clone();
279 let logger = Self {
280 tx: logger_tx,
281 config: config.clone(),
282 };
283
284 set_boxed_logger(Box::new(logger))?;
285
286 if LOGGER_TX.set(tx).is_err() {
288 debug_assert!(
289 false,
290 "LOGGER_TX already set - re-initialization not supported"
291 );
292 }
293
294 let is_colored = config.is_colored;
295
296 let print_config = config.print_config;
297 if print_config {
298 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
299 println!("Logger initialized with {config:?} {file_config:?}");
300 }
301
302 let handle = std::thread::Builder::new()
303 .name(LOGGING.to_string())
304 .spawn(move || {
305 Self::handle_messages(
306 trader_id.to_string(),
307 instance_id.to_string(),
308 config,
309 file_config,
310 rx,
311 );
312 })?;
313
314 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
316 debug_assert!(
317 handle_guard.is_none(),
318 "LOGGER_HANDLE already set - re-initialization not supported"
319 );
320 *handle_guard = Some(handle);
321 }
322
323 let max_level = log::LevelFilter::Trace;
324 set_max_level(max_level);
325
326 if print_config {
327 println!("Logger set as `log` implementation with max level {max_level}");
328 }
329
330 super::LOGGING_INITIALIZED.store(true, Ordering::SeqCst);
331 super::LOGGING_COLORED.store(is_colored, Ordering::SeqCst);
332
333 LogGuard::new()
334 .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
335 }
336
337 fn handle_messages(
338 trader_id: String,
339 instance_id: String,
340 config: LoggerConfig,
341 file_config: FileWriterConfig,
342 rx: std::sync::mpsc::Receiver<LogEvent>,
343 ) {
344 let LoggerConfig {
345 stdout_level,
346 fileout_level,
347 component_level,
348 log_components_only,
349 is_colored,
350 print_config: _,
351 } = config;
352
353 let trader_id_cache = Ustr::from(&trader_id);
354
355 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
357 let mut stderr_writer = StderrWriter::new(is_colored);
358
359 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
361 None
362 } else {
363 FileWriter::new(trader_id, instance_id, file_config, fileout_level)
364 };
365
366 let process_event = |event: LogEvent,
367 stdout_writer: &mut StdoutWriter,
368 stderr_writer: &mut StderrWriter,
369 file_writer_opt: &mut Option<FileWriter>| {
370 match event {
371 LogEvent::Log(line) => {
372 let component_filter_level = component_level.get(&line.component);
373
374 if log_components_only && component_filter_level.is_none() {
375 return;
376 }
377
378 if let Some(&filter_level) = component_filter_level
379 && line.level > filter_level
380 {
381 return;
382 }
383
384 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
385
386 if stderr_writer.enabled(&wrapper.line) {
387 if is_colored {
388 stderr_writer.write(wrapper.get_colored());
389 } else {
390 stderr_writer.write(wrapper.get_string());
391 }
392 }
393
394 if stdout_writer.enabled(&wrapper.line) {
395 if is_colored {
396 stdout_writer.write(wrapper.get_colored());
397 } else {
398 stdout_writer.write(wrapper.get_string());
399 }
400 }
401
402 if let Some(file_writer) = file_writer_opt
403 && file_writer.enabled(&wrapper.line)
404 {
405 if file_writer.json_format {
406 file_writer.write(&wrapper.get_json());
407 } else {
408 file_writer.write(wrapper.get_string());
409 }
410 }
411 }
412 LogEvent::Flush => {
413 stdout_writer.flush();
414 stderr_writer.flush();
415
416 if let Some(file_writer) = file_writer_opt {
417 file_writer.flush();
418 }
419 }
420 LogEvent::Close => {
421 }
423 }
424 };
425
426 while let Ok(event) = rx.recv() {
428 match event {
429 LogEvent::Log(_) | LogEvent::Flush => process_event(
430 event,
431 &mut stdout_writer,
432 &mut stderr_writer,
433 &mut file_writer_opt,
434 ),
435 LogEvent::Close => {
436 stdout_writer.flush();
438 stderr_writer.flush();
439
440 if let Some(ref mut file_writer) = file_writer_opt {
441 file_writer.flush();
442 }
443
444 while let Ok(evt) = rx.try_recv() {
447 match evt {
448 LogEvent::Close => (), _ => process_event(
450 evt,
451 &mut stdout_writer,
452 &mut stderr_writer,
453 &mut file_writer_opt,
454 ),
455 }
456 }
457
458 stdout_writer.flush();
460 stderr_writer.flush();
461
462 if let Some(ref mut file_writer) = file_writer_opt {
463 file_writer.flush();
464 }
465
466 break;
467 }
468 }
469 }
470 }
471}
472
473pub(crate) fn shutdown_graceful() {
482 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
484 log::set_max_level(log::LevelFilter::Off);
485
486 if let Some(tx) = LOGGER_TX.get() {
488 let _ = tx.send(LogEvent::Close);
489 }
490
491 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
492 && let Some(handle) = handle_guard.take()
493 && handle.thread().id() != std::thread::current().id()
494 {
495 let _ = handle.join();
496 }
497
498 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
499}
500
501pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
502 let color = Value::from(color as u8);
503
504 match level {
505 LogLevel::Off => {}
506 LogLevel::Trace => {
507 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
508 }
509 LogLevel::Debug => {
510 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
511 }
512 LogLevel::Info => {
513 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
514 }
515 LogLevel::Warning => {
516 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
517 }
518 LogLevel::Error => {
519 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
520 }
521 }
522}
523
524#[cfg_attr(
551 feature = "python",
552 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
553)]
554#[derive(Debug)]
555pub struct LogGuard {
556 tx: std::sync::mpsc::Sender<LogEvent>,
557}
558
559impl LogGuard {
560 #[must_use]
568 pub fn new() -> Option<Self> {
569 LOGGER_TX.get().map(|tx| {
570 LOGGING_GUARDS_ACTIVE
571 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
572 if count == u8::MAX {
573 None } else {
575 Some(count + 1)
576 }
577 })
578 .expect("Maximum number of active LogGuards (255) exceeded");
579
580 Self { tx: tx.clone() }
581 })
582 }
583}
584
585impl Drop for LogGuard {
586 fn drop(&mut self) {
591 let previous_count = LOGGING_GUARDS_ACTIVE
592 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
593 if count == 0 {
594 panic!("LogGuard reference count underflow");
595 }
596 Some(count - 1)
597 })
598 .expect("Failed to decrement LogGuard count");
599
600 if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
602 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
606
607 log::set_max_level(log::LevelFilter::Off);
609
610 let _ = self.tx.send(LogEvent::Close);
612
613 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
615 && let Some(handle) = handle_guard.take()
616 {
617 if handle.thread().id() != std::thread::current().id() {
619 let _ = handle.join();
620 }
621 }
622
623 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
625 } else {
626 let _ = self.tx.send(LogEvent::Flush);
628 }
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use std::time::Duration;
635
636 use ahash::AHashMap;
637 use log::LevelFilter;
638 use nautilus_core::UUID4;
639 use nautilus_model::identifiers::TraderId;
640 use rstest::*;
641 use serde_json::Value;
642 use tempfile::tempdir;
643 use ustr::Ustr;
644
645 use super::*;
646 use crate::{
647 enums::LogColor,
648 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
649 testing::wait_until,
650 };
651
652 #[rstest]
653 fn log_message_serialization() {
654 let log_message = LogLine {
655 timestamp: UnixNanos::default(),
656 level: log::Level::Info,
657 color: LogColor::Normal,
658 component: Ustr::from("Portfolio"),
659 message: "This is a log message".to_string(),
660 };
661
662 let serialized_json = serde_json::to_string(&log_message).unwrap();
663 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
664
665 assert_eq!(deserialized_value["level"], "INFO");
666 assert_eq!(deserialized_value["component"], "Portfolio");
667 assert_eq!(deserialized_value["message"], "This is a log message");
668 }
669
670 #[rstest]
671 fn log_config_parsing() {
672 let config =
673 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
674 .unwrap();
675 assert_eq!(
676 config,
677 LoggerConfig {
678 stdout_level: LevelFilter::Info,
679 fileout_level: LevelFilter::Debug,
680 component_level: AHashMap::from_iter(vec![(
681 Ustr::from("RiskEngine"),
682 LevelFilter::Error
683 )]),
684 log_components_only: false,
685 is_colored: true,
686 print_config: false,
687 }
688 );
689 }
690
691 #[rstest]
692 fn log_config_parsing2() {
693 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
694 assert_eq!(
695 config,
696 LoggerConfig {
697 stdout_level: LevelFilter::Warn,
698 fileout_level: LevelFilter::Error,
699 component_level: AHashMap::new(),
700 log_components_only: false,
701 is_colored: true,
702 print_config: true,
703 }
704 );
705 }
706
707 #[rstest]
708 fn log_config_parsing_with_log_components_only() {
709 let config =
710 LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
711 assert_eq!(
712 config,
713 LoggerConfig {
714 stdout_level: LevelFilter::Info,
715 fileout_level: LevelFilter::Off,
716 component_level: AHashMap::from_iter(vec![(
717 Ustr::from("RiskEngine"),
718 LevelFilter::Debug
719 )]),
720 log_components_only: true,
721 is_colored: true,
722 print_config: false,
723 }
724 );
725 }
726
727 #[rstest]
728 fn test_log_line_wrapper_plain_string() {
729 let line = LogLine {
730 timestamp: 1_650_000_000_000_000_000.into(),
731 level: log::Level::Info,
732 color: LogColor::Normal,
733 component: Ustr::from("TestComponent"),
734 message: "Test message".to_string(),
735 };
736
737 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
738 let result = wrapper.get_string();
739
740 assert!(result.contains("TRADER-001"));
741 assert!(result.contains("TestComponent"));
742 assert!(result.contains("Test message"));
743 assert!(result.contains("[INFO]"));
744 assert!(result.ends_with('\n'));
745 assert!(!result.contains("\x1b["));
747 }
748
749 #[rstest]
750 fn test_log_line_wrapper_colored_string() {
751 let line = LogLine {
752 timestamp: 1_650_000_000_000_000_000.into(),
753 level: log::Level::Info,
754 color: LogColor::Green,
755 component: Ustr::from("TestComponent"),
756 message: "Test message".to_string(),
757 };
758
759 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
760 let result = wrapper.get_colored();
761
762 assert!(result.contains("TRADER-001"));
763 assert!(result.contains("TestComponent"));
764 assert!(result.contains("Test message"));
765 assert!(result.contains("\x1b["));
767 assert!(result.ends_with('\n'));
768 }
769
770 #[rstest]
771 fn test_log_line_wrapper_json_output() {
772 let line = LogLine {
773 timestamp: 1_650_000_000_000_000_000.into(),
774 level: log::Level::Warn,
775 color: LogColor::Yellow,
776 component: Ustr::from("RiskEngine"),
777 message: "Warning message".to_string(),
778 };
779
780 let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-002"));
781 let json = wrapper.get_json();
782
783 let parsed: Value = serde_json::from_str(json.trim()).unwrap();
784 assert_eq!(parsed["trader_id"], "TRADER-002");
785 assert_eq!(parsed["component"], "RiskEngine");
786 assert_eq!(parsed["message"], "Warning message");
787 assert_eq!(parsed["level"], "WARN");
788 assert_eq!(parsed["color"], "YELLOW");
789 }
790
791 #[rstest]
792 fn test_log_line_wrapper_caches_string() {
793 let line = LogLine {
794 timestamp: 1_650_000_000_000_000_000.into(),
795 level: log::Level::Info,
796 color: LogColor::Normal,
797 component: Ustr::from("Test"),
798 message: "Cached".to_string(),
799 };
800
801 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER"));
802 let first = wrapper.get_string().to_string();
803 let second = wrapper.get_string().to_string();
804
805 assert_eq!(first, second);
806 }
807
808 #[rstest]
809 fn test_log_line_display() {
810 let line = LogLine {
811 timestamp: 0.into(),
812 level: log::Level::Error,
813 color: LogColor::Red,
814 component: Ustr::from("Component"),
815 message: "Error occurred".to_string(),
816 };
817
818 let display = format!("{line}");
819 assert_eq!(display, "[ERROR] Component: Error occurred");
820 }
821
822 mod serial_tests {
825 use std::sync::atomic::Ordering;
826
827 use super::*;
828 use crate::logging::{LOGGING_BYPASSED, logging_is_initialized, logging_set_bypass};
829
830 #[rstest]
831 fn test_logging_to_file() {
832 let config = LoggerConfig {
833 fileout_level: LevelFilter::Debug,
834 ..Default::default()
835 };
836
837 let temp_dir = tempdir().expect("Failed to create temporary directory");
838 let file_config = FileWriterConfig {
839 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
840 ..Default::default()
841 };
842
843 let log_guard = Logger::init_with_config(
844 TraderId::from("TRADER-001"),
845 UUID4::new(),
846 config,
847 file_config,
848 );
849
850 logging_clock_set_static_mode();
851 logging_clock_set_static_time(1_650_000_000_000_000);
852
853 log::info!(
854 component = "RiskEngine";
855 "This is a test."
856 );
857
858 let mut log_contents = String::new();
859
860 wait_until(
861 || {
862 std::fs::read_dir(&temp_dir)
863 .expect("Failed to read directory")
864 .filter_map(Result::ok)
865 .any(|entry| entry.path().is_file())
866 },
867 Duration::from_secs(3),
868 );
869
870 drop(log_guard); wait_until(
873 || {
874 let log_file_path = std::fs::read_dir(&temp_dir)
875 .expect("Failed to read directory")
876 .filter_map(Result::ok)
877 .find(|entry| entry.path().is_file())
878 .expect("No files found in directory")
879 .path();
880 log_contents = std::fs::read_to_string(log_file_path)
881 .expect("Error while reading log file");
882 !log_contents.is_empty()
883 },
884 Duration::from_secs(3),
885 );
886
887 assert_eq!(
888 log_contents,
889 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
890 );
891 }
892
893 #[rstest]
894 fn test_shutdown_drains_backlog_tail() {
895 let config = LoggerConfig {
897 stdout_level: LevelFilter::Off,
898 fileout_level: LevelFilter::Info,
899 ..Default::default()
900 };
901
902 let temp_dir = tempdir().expect("Failed to create temporary directory");
903 let file_config = FileWriterConfig {
904 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
905 ..Default::default()
906 };
907
908 let log_guard = Logger::init_with_config(
909 TraderId::from("TRADER-TAIL"),
910 UUID4::new(),
911 config,
912 file_config,
913 )
914 .expect("Failed to initialize logger");
915
916 logging_clock_set_static_mode();
918 logging_clock_set_static_time(1_700_000_000_000_000);
919
920 const N: usize = 1000;
922 for i in 0..N {
923 log::info!(component = "TailDrain"; "BacklogTest {i}");
924 }
925
926 drop(log_guard);
928
929 let mut count = 0usize;
931 wait_until(
932 || {
933 if let Some(log_file) = std::fs::read_dir(&temp_dir)
934 .expect("Failed to read directory")
935 .filter_map(Result::ok)
936 .find(|entry| entry.path().is_file())
937 {
938 let log_file_path = log_file.path();
939 if let Ok(contents) = std::fs::read_to_string(log_file_path) {
940 count = contents
941 .lines()
942 .filter(|l| l.contains("BacklogTest "))
943 .count();
944 count >= N
945 } else {
946 false
947 }
948 } else {
949 false
950 }
951 },
952 Duration::from_secs(5),
953 );
954
955 assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
956 }
957
958 #[rstest]
959 fn test_log_component_level_filtering() {
960 let config =
961 LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
962
963 let temp_dir = tempdir().expect("Failed to create temporary directory");
964 let file_config = FileWriterConfig {
965 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
966 ..Default::default()
967 };
968
969 let log_guard = Logger::init_with_config(
970 TraderId::from("TRADER-001"),
971 UUID4::new(),
972 config,
973 file_config,
974 );
975
976 logging_clock_set_static_mode();
977 logging_clock_set_static_time(1_650_000_000_000_000);
978
979 log::info!(
980 component = "RiskEngine";
981 "This is a test."
982 );
983
984 drop(log_guard); wait_until(
987 || {
988 if let Some(log_file) = std::fs::read_dir(&temp_dir)
989 .expect("Failed to read directory")
990 .filter_map(Result::ok)
991 .find(|entry| entry.path().is_file())
992 {
993 let log_file_path = log_file.path();
994 let log_contents = std::fs::read_to_string(log_file_path)
995 .expect("Error while reading log file");
996 !log_contents.contains("RiskEngine")
997 } else {
998 false
999 }
1000 },
1001 Duration::from_secs(3),
1002 );
1003
1004 assert!(
1005 std::fs::read_dir(&temp_dir)
1006 .expect("Failed to read directory")
1007 .filter_map(Result::ok)
1008 .any(|entry| entry.path().is_file()),
1009 "Log file exists"
1010 );
1011 }
1012
1013 #[rstest]
1014 fn test_logging_to_file_in_json_format() {
1015 let config =
1016 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1017 .unwrap();
1018
1019 let temp_dir = tempdir().expect("Failed to create temporary directory");
1020 let file_config = FileWriterConfig {
1021 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1022 file_format: Some("json".to_string()),
1023 ..Default::default()
1024 };
1025
1026 let log_guard = Logger::init_with_config(
1027 TraderId::from("TRADER-001"),
1028 UUID4::new(),
1029 config,
1030 file_config,
1031 );
1032
1033 logging_clock_set_static_mode();
1034 logging_clock_set_static_time(1_650_000_000_000_000);
1035
1036 log::info!(
1037 component = "RiskEngine";
1038 "This is a test."
1039 );
1040
1041 let mut log_contents = String::new();
1042
1043 drop(log_guard); wait_until(
1046 || {
1047 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1048 .expect("Failed to read directory")
1049 .filter_map(Result::ok)
1050 .find(|entry| entry.path().is_file())
1051 {
1052 let log_file_path = log_file.path();
1053 log_contents = std::fs::read_to_string(log_file_path)
1054 .expect("Error while reading log file");
1055 !log_contents.is_empty()
1056 } else {
1057 false
1058 }
1059 },
1060 Duration::from_secs(3),
1061 );
1062
1063 assert_eq!(
1064 log_contents,
1065 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1066 );
1067 }
1068
1069 #[rstest]
1070 fn test_init_sets_logging_is_initialized_flag() {
1071 let config = LoggerConfig::default();
1072 let file_config = FileWriterConfig::default();
1073
1074 let guard = Logger::init_with_config(
1075 TraderId::from("TRADER-001"),
1076 UUID4::new(),
1077 config,
1078 file_config,
1079 );
1080 assert!(guard.is_ok());
1081 assert!(logging_is_initialized());
1082
1083 drop(guard);
1084 assert!(!logging_is_initialized());
1085 }
1086
1087 #[rstest]
1088 fn test_reinit_after_guard_drop_fails() {
1089 let config = LoggerConfig::default();
1090 let file_config = FileWriterConfig::default();
1091
1092 let guard1 = Logger::init_with_config(
1093 TraderId::from("TRADER-001"),
1094 UUID4::new(),
1095 config.clone(),
1096 file_config.clone(),
1097 );
1098 assert!(guard1.is_ok());
1099 drop(guard1);
1100
1101 let guard2 = Logger::init_with_config(
1103 TraderId::from("TRADER-002"),
1104 UUID4::new(),
1105 config,
1106 file_config,
1107 );
1108 assert!(guard2.is_err());
1109 }
1110
1111 #[rstest]
1112 fn test_bypass_before_init_prevents_logging() {
1113 logging_set_bypass();
1114 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1115
1116 let temp_dir = tempdir().expect("Failed to create temporary directory");
1117 let config = LoggerConfig {
1118 fileout_level: LevelFilter::Debug,
1119 ..Default::default()
1120 };
1121 let file_config = FileWriterConfig {
1122 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1123 ..Default::default()
1124 };
1125
1126 let guard = Logger::init_with_config(
1127 TraderId::from("TRADER-001"),
1128 UUID4::new(),
1129 config,
1130 file_config,
1131 );
1132 assert!(guard.is_ok());
1133
1134 log::info!(
1135 component = "TestComponent";
1136 "This should be bypassed"
1137 );
1138 std::thread::sleep(Duration::from_millis(100));
1139 drop(guard);
1140
1141 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1143 }
1144 }
1145}