1use std::{
17 fmt::Display,
18 sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
19};
20
21use ahash::AHashMap;
22use indexmap::IndexMap;
23use log::{
24 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
25 kv::{ToValue, Value},
26 set_boxed_logger, set_max_level,
27};
28use nautilus_core::{
29 UUID4, UnixNanos,
30 datetime::unix_nanos_to_iso8601,
31 time::{get_atomic_clock_realtime, get_atomic_clock_static},
32};
33use nautilus_model::identifiers::TraderId;
34use serde::{Deserialize, Serialize, Serializer};
35use ustr::Ustr;
36
37pub use super::config::LoggerConfig;
38use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
39use crate::{
40 enums::{LogColor, LogLevel},
41 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
42};
43
44const LOGGING: &str = "logging";
45const KV_COLOR: &str = "color";
46const KV_COMPONENT: &str = "component";
47
48static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
50
51static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
53
54#[derive(Debug)]
60pub struct Logger {
61 pub config: LoggerConfig,
63 tx: std::sync::mpsc::Sender<LogEvent>,
65}
66
67#[derive(Debug)]
69pub enum LogEvent {
70 Log(LogLine),
72 Flush,
74 Close,
76}
77
78#[derive(Clone, Debug, Serialize, Deserialize)]
80pub struct LogLine {
81 pub timestamp: UnixNanos,
83 pub level: Level,
85 pub color: LogColor,
87 pub component: Ustr,
89 pub message: String,
91}
92
93impl Display for LogLine {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
96 }
97}
98
99#[derive(Clone, Debug)]
106pub struct LogLineWrapper {
107 line: LogLine,
109 cache: Option<String>,
111 colored: Option<String>,
113 trader_id: Ustr,
115}
116
117impl LogLineWrapper {
118 #[must_use]
120 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
121 Self {
122 line,
123 cache: None,
124 colored: None,
125 trader_id,
126 }
127 }
128
129 pub fn get_string(&mut self) -> &str {
134 self.cache.get_or_insert_with(|| {
135 format!(
136 "{} [{}] {}.{}: {}\n",
137 unix_nanos_to_iso8601(self.line.timestamp),
138 self.line.level,
139 self.trader_id,
140 &self.line.component,
141 &self.line.message,
142 )
143 })
144 }
145
146 pub fn get_colored(&mut self) -> &str {
152 self.colored.get_or_insert_with(|| {
153 format!(
154 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
155 unix_nanos_to_iso8601(self.line.timestamp),
156 &self.line.color.as_ansi(),
157 self.line.level,
158 self.trader_id,
159 &self.line.component,
160 &self.line.message,
161 )
162 })
163 }
164
165 #[must_use]
174 pub fn get_json(&self) -> String {
175 let json_string =
176 serde_json::to_string(&self).expect("Error serializing log event to string");
177 format!("{json_string}\n")
178 }
179}
180
181impl Serialize for LogLineWrapper {
182 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
183 where
184 S: Serializer,
185 {
186 let mut json_obj = IndexMap::new();
187 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
188 json_obj.insert("timestamp".to_string(), timestamp);
189 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
190 json_obj.insert("level".to_string(), self.line.level.to_string());
191 json_obj.insert("color".to_string(), self.line.color.to_string());
192 json_obj.insert("component".to_string(), self.line.component.to_string());
193 json_obj.insert("message".to_string(), self.line.message.clone());
194
195 json_obj.serialize(serializer)
196 }
197}
198
199impl Log for Logger {
200 fn enabled(&self, metadata: &log::Metadata) -> bool {
201 !LOGGING_BYPASSED.load(Ordering::Relaxed)
202 && (metadata.level() == Level::Error
203 || metadata.level() <= self.config.stdout_level
204 || metadata.level() <= self.config.fileout_level)
205 }
206
207 fn log(&self, record: &log::Record) {
208 if self.enabled(record.metadata()) {
209 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
210 get_atomic_clock_realtime().get_time_ns()
211 } else {
212 get_atomic_clock_static().get_time_ns()
213 };
214 let level = record.level();
215 let key_values = record.key_values();
216 let color: LogColor = key_values
217 .get(KV_COLOR.into())
218 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
219 .unwrap_or(level.into());
220 let component = key_values.get(KV_COMPONENT.into()).map_or_else(
221 || Ustr::from(record.metadata().target()),
222 |v| Ustr::from(&v.to_string()),
223 );
224
225 let line = LogLine {
226 timestamp,
227 level,
228 color,
229 component,
230 message: format!("{}", record.args()),
231 };
232 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
233 eprintln!("Error sending log event (receiver closed): {line}");
234 }
235 }
236 }
237
238 fn flush(&self) {
239 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
241 return;
242 }
243
244 if let Err(e) = self.tx.send(LogEvent::Flush) {
245 eprintln!("Error sending flush log event: {e}");
246 }
247 }
248}
249
250#[allow(clippy::too_many_arguments)]
251impl Logger {
252 pub fn init_with_env(
258 trader_id: TraderId,
259 instance_id: UUID4,
260 file_config: FileWriterConfig,
261 ) -> anyhow::Result<LogGuard> {
262 let config = LoggerConfig::from_env()?;
263 Self::init_with_config(trader_id, instance_id, config, file_config)
264 }
265
266 pub fn init_with_config(
272 trader_id: TraderId,
273 instance_id: UUID4,
274 config: LoggerConfig,
275 file_config: FileWriterConfig,
276 ) -> anyhow::Result<LogGuard> {
277 if super::LOGGING_INITIALIZED.load(Ordering::SeqCst) {
279 return LogGuard::new()
280 .ok_or_else(|| anyhow::anyhow!("Logging already initialized but sender missing"));
281 }
282
283 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
284
285 let logger_tx = tx.clone();
286 let logger = Self {
287 tx: logger_tx,
288 config: config.clone(),
289 };
290
291 set_boxed_logger(Box::new(logger))?;
292
293 if LOGGER_TX.set(tx).is_err() {
295 debug_assert!(
296 false,
297 "LOGGER_TX already set - re-initialization not supported"
298 );
299 }
300
301 let is_colored = config.is_colored;
302
303 let print_config = config.print_config;
304 if print_config {
305 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
306 println!("Logger initialized with {config:?} {file_config:?}");
307 }
308
309 let handle = std::thread::Builder::new()
310 .name(LOGGING.to_string())
311 .spawn(move || {
312 Self::handle_messages(
313 trader_id.to_string(),
314 instance_id.to_string(),
315 config,
316 file_config,
317 rx,
318 );
319 })?;
320
321 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
323 debug_assert!(
324 handle_guard.is_none(),
325 "LOGGER_HANDLE already set - re-initialization not supported"
326 );
327 *handle_guard = Some(handle);
328 }
329
330 let max_level = log::LevelFilter::Trace;
331 set_max_level(max_level);
332
333 if print_config {
334 println!("Logger set as `log` implementation with max level {max_level}");
335 }
336
337 super::LOGGING_INITIALIZED.store(true, Ordering::SeqCst);
338 super::LOGGING_COLORED.store(is_colored, Ordering::SeqCst);
339
340 LogGuard::new()
341 .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
342 }
343
344 fn handle_messages(
345 trader_id: String,
346 instance_id: String,
347 config: LoggerConfig,
348 file_config: FileWriterConfig,
349 rx: std::sync::mpsc::Receiver<LogEvent>,
350 ) {
351 let LoggerConfig {
352 stdout_level,
353 fileout_level,
354 component_level,
355 module_level,
356 log_components_only,
357 is_colored,
358 print_config: _,
359 } = config;
360
361 let mut module_filters_sorted: Vec<(Ustr, LevelFilter)> =
363 module_level.into_iter().collect();
364 module_filters_sorted.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
365
366 let trader_id_cache = Ustr::from(&trader_id);
367
368 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
370 let mut stderr_writer = StderrWriter::new(is_colored);
371
372 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
374 None
375 } else {
376 FileWriter::new(trader_id, instance_id, file_config, fileout_level)
377 };
378
379 let process_event = |event: LogEvent,
380 stdout_writer: &mut StdoutWriter,
381 stderr_writer: &mut StderrWriter,
382 file_writer_opt: &mut Option<FileWriter>| {
383 match event {
384 LogEvent::Log(line) => {
385 if should_filter_log(
386 &line.component,
387 line.level,
388 &module_filters_sorted,
389 &component_level,
390 log_components_only,
391 ) {
392 return;
393 }
394
395 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
396
397 if stderr_writer.enabled(&wrapper.line) {
398 if is_colored {
399 stderr_writer.write(wrapper.get_colored());
400 } else {
401 stderr_writer.write(wrapper.get_string());
402 }
403 }
404
405 if stdout_writer.enabled(&wrapper.line) {
406 if is_colored {
407 stdout_writer.write(wrapper.get_colored());
408 } else {
409 stdout_writer.write(wrapper.get_string());
410 }
411 }
412
413 if let Some(file_writer) = file_writer_opt
414 && file_writer.enabled(&wrapper.line)
415 {
416 if file_writer.json_format {
417 file_writer.write(&wrapper.get_json());
418 } else {
419 file_writer.write(wrapper.get_string());
420 }
421 }
422 }
423 LogEvent::Flush => {
424 stdout_writer.flush();
425 stderr_writer.flush();
426
427 if let Some(file_writer) = file_writer_opt {
428 file_writer.flush();
429 }
430 }
431 LogEvent::Close => {
432 }
434 }
435 };
436
437 while let Ok(event) = rx.recv() {
439 match event {
440 LogEvent::Log(_) | LogEvent::Flush => process_event(
441 event,
442 &mut stdout_writer,
443 &mut stderr_writer,
444 &mut file_writer_opt,
445 ),
446 LogEvent::Close => {
447 stdout_writer.flush();
449 stderr_writer.flush();
450
451 if let Some(ref mut file_writer) = file_writer_opt {
452 file_writer.flush();
453 }
454
455 while let Ok(evt) = rx.try_recv() {
458 match evt {
459 LogEvent::Close => (), _ => process_event(
461 evt,
462 &mut stdout_writer,
463 &mut stderr_writer,
464 &mut file_writer_opt,
465 ),
466 }
467 }
468
469 stdout_writer.flush();
471 stderr_writer.flush();
472
473 if let Some(ref mut file_writer) = file_writer_opt {
474 file_writer.flush();
475 }
476
477 break;
478 }
479 }
480 }
481 }
482}
483
484#[must_use]
491pub fn should_filter_log(
492 component: &Ustr,
493 line_level: log::Level,
494 module_filters_sorted: &[(Ustr, LevelFilter)],
495 component_level: &AHashMap<Ustr, LevelFilter>,
496 log_components_only: bool,
497) -> bool {
498 if module_filters_sorted.is_empty() && component_level.is_empty() {
499 return log_components_only;
500 }
501
502 let module_filter = module_filters_sorted
504 .iter()
505 .find(|(path, _)| component.starts_with(path.as_str()))
506 .map(|(_, level)| *level);
507
508 let component_filter = component_level.get(component).copied();
509
510 if log_components_only && module_filter.is_none() && component_filter.is_none() {
511 return true;
512 }
513
514 if let Some(filter_level) = module_filter.or(component_filter)
516 && line_level > filter_level
517 {
518 return true;
519 }
520
521 false
522}
523
524pub(crate) fn shutdown_graceful() {
533 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
535 log::set_max_level(log::LevelFilter::Off);
536
537 if let Some(tx) = LOGGER_TX.get() {
539 let _ = tx.send(LogEvent::Close);
540 }
541
542 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
543 && let Some(handle) = handle_guard.take()
544 && handle.thread().id() != std::thread::current().id()
545 {
546 let _ = handle.join();
547 }
548
549 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
550}
551
552pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
553 let color = Value::from(color as u8);
554
555 match level {
556 LogLevel::Off => {}
557 LogLevel::Trace => {
558 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
559 }
560 LogLevel::Debug => {
561 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
562 }
563 LogLevel::Info => {
564 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
565 }
566 LogLevel::Warning => {
567 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
568 }
569 LogLevel::Error => {
570 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
571 }
572 }
573}
574
575#[cfg_attr(
602 feature = "python",
603 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
604)]
605#[derive(Debug)]
606pub struct LogGuard {
607 tx: std::sync::mpsc::Sender<LogEvent>,
608}
609
610impl LogGuard {
611 #[must_use]
619 pub fn new() -> Option<Self> {
620 LOGGER_TX.get().map(|tx| {
621 LOGGING_GUARDS_ACTIVE
622 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
623 if count == u8::MAX {
624 None } else {
626 Some(count + 1)
627 }
628 })
629 .expect("Maximum number of active LogGuards (255) exceeded");
630
631 Self { tx: tx.clone() }
632 })
633 }
634}
635
636impl Drop for LogGuard {
637 fn drop(&mut self) {
642 let previous_count = LOGGING_GUARDS_ACTIVE
643 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
644 assert!(count != 0, "LogGuard reference count underflow");
645 Some(count - 1)
646 })
647 .expect("Failed to decrement LogGuard count");
648
649 if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
651 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
655
656 log::set_max_level(log::LevelFilter::Off);
658
659 let _ = self.tx.send(LogEvent::Close);
661
662 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
664 && let Some(handle) = handle_guard.take()
665 {
666 if handle.thread().id() != std::thread::current().id() {
668 let _ = handle.join();
669 }
670 }
671
672 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
674 } else {
675 let _ = self.tx.send(LogEvent::Flush);
677 }
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use std::time::Duration;
684
685 use ahash::AHashMap;
686 use log::LevelFilter;
687 use nautilus_core::UUID4;
688 use nautilus_model::identifiers::TraderId;
689 use rstest::*;
690 use serde_json::Value;
691 use tempfile::tempdir;
692 use ustr::Ustr;
693
694 use super::*;
695 use crate::{
696 enums::LogColor,
697 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
698 testing::wait_until,
699 };
700
701 #[rstest]
702 fn log_message_serialization() {
703 let log_message = LogLine {
704 timestamp: UnixNanos::default(),
705 level: log::Level::Info,
706 color: LogColor::Normal,
707 component: Ustr::from("Portfolio"),
708 message: "This is a log message".to_string(),
709 };
710
711 let serialized_json = serde_json::to_string(&log_message).unwrap();
712 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
713
714 assert_eq!(deserialized_value["level"], "INFO");
715 assert_eq!(deserialized_value["component"], "Portfolio");
716 assert_eq!(deserialized_value["message"], "This is a log message");
717 }
718
719 #[rstest]
720 fn log_config_parsing() {
721 let config =
722 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
723 .unwrap();
724 assert_eq!(
725 config,
726 LoggerConfig {
727 stdout_level: LevelFilter::Info,
728 fileout_level: LevelFilter::Debug,
729 component_level: AHashMap::from_iter(vec![(
730 Ustr::from("RiskEngine"),
731 LevelFilter::Error
732 )]),
733 module_level: AHashMap::new(),
734 log_components_only: false,
735 is_colored: true,
736 print_config: false,
737 }
738 );
739 }
740
741 #[rstest]
742 fn log_config_parsing2() {
743 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
744 assert_eq!(
745 config,
746 LoggerConfig {
747 stdout_level: LevelFilter::Warn,
748 fileout_level: LevelFilter::Error,
749 component_level: AHashMap::new(),
750 module_level: AHashMap::new(),
751 log_components_only: false,
752 is_colored: true,
753 print_config: true,
754 }
755 );
756 }
757
758 #[rstest]
759 fn log_config_parsing_with_log_components_only() {
760 let config =
761 LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
762 assert_eq!(
763 config,
764 LoggerConfig {
765 stdout_level: LevelFilter::Info,
766 fileout_level: LevelFilter::Off,
767 component_level: AHashMap::from_iter(vec![(
768 Ustr::from("RiskEngine"),
769 LevelFilter::Debug
770 )]),
771 module_level: AHashMap::new(),
772 log_components_only: true,
773 is_colored: true,
774 print_config: false,
775 }
776 );
777 }
778
779 #[rstest]
780 fn test_log_line_wrapper_plain_string() {
781 let line = LogLine {
782 timestamp: 1_650_000_000_000_000_000.into(),
783 level: log::Level::Info,
784 color: LogColor::Normal,
785 component: Ustr::from("TestComponent"),
786 message: "Test message".to_string(),
787 };
788
789 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
790 let result = wrapper.get_string();
791
792 assert!(result.contains("TRADER-001"));
793 assert!(result.contains("TestComponent"));
794 assert!(result.contains("Test message"));
795 assert!(result.contains("[INFO]"));
796 assert!(result.ends_with('\n'));
797 assert!(!result.contains("\x1b["));
799 }
800
801 #[rstest]
802 fn test_log_line_wrapper_colored_string() {
803 let line = LogLine {
804 timestamp: 1_650_000_000_000_000_000.into(),
805 level: log::Level::Info,
806 color: LogColor::Green,
807 component: Ustr::from("TestComponent"),
808 message: "Test message".to_string(),
809 };
810
811 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
812 let result = wrapper.get_colored();
813
814 assert!(result.contains("TRADER-001"));
815 assert!(result.contains("TestComponent"));
816 assert!(result.contains("Test message"));
817 assert!(result.contains("\x1b["));
819 assert!(result.ends_with('\n'));
820 }
821
822 #[rstest]
823 fn test_log_line_wrapper_json_output() {
824 let line = LogLine {
825 timestamp: 1_650_000_000_000_000_000.into(),
826 level: log::Level::Warn,
827 color: LogColor::Yellow,
828 component: Ustr::from("RiskEngine"),
829 message: "Warning message".to_string(),
830 };
831
832 let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-002"));
833 let json = wrapper.get_json();
834
835 let parsed: Value = serde_json::from_str(json.trim()).unwrap();
836 assert_eq!(parsed["trader_id"], "TRADER-002");
837 assert_eq!(parsed["component"], "RiskEngine");
838 assert_eq!(parsed["message"], "Warning message");
839 assert_eq!(parsed["level"], "WARN");
840 assert_eq!(parsed["color"], "YELLOW");
841 }
842
843 #[rstest]
844 fn test_log_line_wrapper_caches_string() {
845 let line = LogLine {
846 timestamp: 1_650_000_000_000_000_000.into(),
847 level: log::Level::Info,
848 color: LogColor::Normal,
849 component: Ustr::from("Test"),
850 message: "Cached".to_string(),
851 };
852
853 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER"));
854 let first = wrapper.get_string().to_string();
855 let second = wrapper.get_string().to_string();
856
857 assert_eq!(first, second);
858 }
859
860 #[rstest]
861 fn test_log_line_display() {
862 let line = LogLine {
863 timestamp: 0.into(),
864 level: log::Level::Error,
865 color: LogColor::Red,
866 component: Ustr::from("Component"),
867 message: "Error occurred".to_string(),
868 };
869
870 let display = format!("{line}");
871 assert_eq!(display, "[ERROR] Component: Error occurred");
872 }
873
874 fn sorted_module_filters(map: AHashMap<Ustr, LevelFilter>) -> Vec<(Ustr, LevelFilter)> {
876 let mut v: Vec<_> = map.into_iter().collect();
877 v.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
878 v
879 }
880
881 #[rstest]
882 fn test_filter_no_filters_passes_all() {
883 let module_filters = vec![];
884 let component_level = AHashMap::new();
885
886 assert!(!should_filter_log(
887 &Ustr::from("anything"),
888 Level::Trace,
889 &module_filters,
890 &component_level,
891 false
892 ));
893 }
894
895 #[rstest]
896 fn test_filter_component_exact_match() {
897 let module_filters = vec![];
898 let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Error)]);
899
900 assert!(should_filter_log(
901 &Ustr::from("RiskEngine"),
902 Level::Info,
903 &module_filters,
904 &component_level,
905 false
906 ));
907 assert!(!should_filter_log(
908 &Ustr::from("RiskEngine"),
909 Level::Error,
910 &module_filters,
911 &component_level,
912 false
913 ));
914 assert!(!should_filter_log(
915 &Ustr::from("Portfolio"),
916 Level::Info,
917 &module_filters,
918 &component_level,
919 false
920 ));
921 }
922
923 #[rstest]
924 fn test_filter_module_prefix_match() {
925 let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
926 let component_level = AHashMap::new();
927
928 assert!(!should_filter_log(
929 &Ustr::from("nautilus_okx::websocket"),
930 Level::Debug,
931 &module_filters,
932 &component_level,
933 false
934 ));
935 assert!(!should_filter_log(
936 &Ustr::from("nautilus_okx::websocket::handler"),
937 Level::Debug,
938 &module_filters,
939 &component_level,
940 false
941 ));
942 assert!(should_filter_log(
943 &Ustr::from("nautilus_okx::websocket::handler"),
944 Level::Trace,
945 &module_filters,
946 &component_level,
947 false
948 ));
949 assert!(!should_filter_log(
950 &Ustr::from("nautilus_binance::data"),
951 Level::Trace,
952 &module_filters,
953 &component_level,
954 false
955 ));
956 }
957
958 #[rstest]
959 fn test_filter_longest_prefix_wins() {
960 let module_filters = sorted_module_filters(AHashMap::from_iter([
961 (Ustr::from("nautilus_okx"), LevelFilter::Error),
962 (Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug),
963 ]));
964 let component_level = AHashMap::new();
965
966 assert!(!should_filter_log(
967 &Ustr::from("nautilus_okx::websocket::handler"),
968 Level::Debug,
969 &module_filters,
970 &component_level,
971 false
972 ));
973 assert!(should_filter_log(
974 &Ustr::from("nautilus_okx::data"),
975 Level::Debug,
976 &module_filters,
977 &component_level,
978 false
979 ));
980 assert!(!should_filter_log(
981 &Ustr::from("nautilus_okx::data"),
982 Level::Error,
983 &module_filters,
984 &component_level,
985 false
986 ));
987 }
988
989 #[rstest]
990 fn test_filter_module_precedence_over_component() {
991 let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
992 let component_level =
993 AHashMap::from_iter([(Ustr::from("nautilus_okx::websocket"), LevelFilter::Error)]);
994
995 assert!(!should_filter_log(
996 &Ustr::from("nautilus_okx::websocket"),
997 Level::Debug,
998 &module_filters,
999 &component_level,
1000 false
1001 ));
1002 }
1003
1004 #[rstest]
1005 fn test_filter_log_components_only_blocks_unknown() {
1006 let module_filters = vec![];
1007 let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Debug)]);
1008
1009 assert!(should_filter_log(
1010 &Ustr::from("Portfolio"),
1011 Level::Info,
1012 &module_filters,
1013 &component_level,
1014 true
1015 ));
1016 assert!(!should_filter_log(
1017 &Ustr::from("RiskEngine"),
1018 Level::Info,
1019 &module_filters,
1020 &component_level,
1021 true
1022 ));
1023 }
1024
1025 #[rstest]
1026 fn test_filter_log_components_only_with_module() {
1027 let module_filters = vec![(Ustr::from("nautilus_okx"), LevelFilter::Debug)];
1028 let component_level = AHashMap::new();
1029
1030 assert!(!should_filter_log(
1031 &Ustr::from("nautilus_okx::websocket"),
1032 Level::Debug,
1033 &module_filters,
1034 &component_level,
1035 true
1036 ));
1037 assert!(should_filter_log(
1038 &Ustr::from("nautilus_binance::data"),
1039 Level::Debug,
1040 &module_filters,
1041 &component_level,
1042 true
1043 ));
1044 }
1045
1046 #[rstest]
1047 fn test_filter_level_comparison() {
1048 let module_filters = vec![];
1049 let component_level = AHashMap::from_iter([(Ustr::from("Test"), LevelFilter::Warn)]);
1050
1051 assert!(!should_filter_log(
1052 &Ustr::from("Test"),
1053 Level::Error,
1054 &module_filters,
1055 &component_level,
1056 false
1057 ));
1058 assert!(!should_filter_log(
1059 &Ustr::from("Test"),
1060 Level::Warn,
1061 &module_filters,
1062 &component_level,
1063 false
1064 ));
1065 assert!(should_filter_log(
1066 &Ustr::from("Test"),
1067 Level::Info,
1068 &module_filters,
1069 &component_level,
1070 false
1071 ));
1072 assert!(should_filter_log(
1073 &Ustr::from("Test"),
1074 Level::Debug,
1075 &module_filters,
1076 &component_level,
1077 false
1078 ));
1079 assert!(should_filter_log(
1080 &Ustr::from("Test"),
1081 Level::Trace,
1082 &module_filters,
1083 &component_level,
1084 false
1085 ));
1086 }
1087
1088 mod serial_tests {
1091 use std::sync::atomic::Ordering;
1092
1093 use super::*;
1094 use crate::logging::{LOGGING_BYPASSED, logging_is_initialized, logging_set_bypass};
1095
1096 #[rstest]
1097 fn test_logging_to_file() {
1098 let config = LoggerConfig {
1099 fileout_level: LevelFilter::Debug,
1100 ..Default::default()
1101 };
1102
1103 let temp_dir = tempdir().expect("Failed to create temporary directory");
1104 let file_config = FileWriterConfig {
1105 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1106 ..Default::default()
1107 };
1108
1109 let log_guard = Logger::init_with_config(
1110 TraderId::from("TRADER-001"),
1111 UUID4::new(),
1112 config,
1113 file_config,
1114 );
1115
1116 logging_clock_set_static_mode();
1117 logging_clock_set_static_time(1_650_000_000_000_000);
1118
1119 log::info!(
1120 component = "RiskEngine";
1121 "This is a test."
1122 );
1123
1124 let mut log_contents = String::new();
1125
1126 wait_until(
1127 || {
1128 std::fs::read_dir(&temp_dir)
1129 .expect("Failed to read directory")
1130 .filter_map(Result::ok)
1131 .any(|entry| entry.path().is_file())
1132 },
1133 Duration::from_secs(3),
1134 );
1135
1136 drop(log_guard); wait_until(
1139 || {
1140 let log_file_path = std::fs::read_dir(&temp_dir)
1141 .expect("Failed to read directory")
1142 .filter_map(Result::ok)
1143 .find(|entry| entry.path().is_file())
1144 .expect("No files found in directory")
1145 .path();
1146 log_contents = std::fs::read_to_string(log_file_path)
1147 .expect("Error while reading log file");
1148 !log_contents.is_empty()
1149 },
1150 Duration::from_secs(3),
1151 );
1152
1153 assert_eq!(
1154 log_contents,
1155 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
1156 );
1157 }
1158
1159 #[rstest]
1160 fn test_shutdown_drains_backlog_tail() {
1161 const N: usize = 1000;
1162
1163 let config = LoggerConfig {
1165 stdout_level: LevelFilter::Off,
1166 fileout_level: LevelFilter::Info,
1167 ..Default::default()
1168 };
1169
1170 let temp_dir = tempdir().expect("Failed to create temporary directory");
1171 let file_config = FileWriterConfig {
1172 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1173 ..Default::default()
1174 };
1175
1176 let log_guard = Logger::init_with_config(
1177 TraderId::from("TRADER-TAIL"),
1178 UUID4::new(),
1179 config,
1180 file_config,
1181 )
1182 .expect("Failed to initialize logger");
1183
1184 logging_clock_set_static_mode();
1186 logging_clock_set_static_time(1_700_000_000_000_000);
1187
1188 for i in 0..N {
1190 log::info!(component = "TailDrain"; "BacklogTest {i}");
1191 }
1192
1193 drop(log_guard);
1195
1196 let mut count = 0usize;
1198 wait_until(
1199 || {
1200 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1201 .expect("Failed to read directory")
1202 .filter_map(Result::ok)
1203 .find(|entry| entry.path().is_file())
1204 {
1205 let log_file_path = log_file.path();
1206 if let Ok(contents) = std::fs::read_to_string(log_file_path) {
1207 count = contents
1208 .lines()
1209 .filter(|l| l.contains("BacklogTest "))
1210 .count();
1211 count >= N
1212 } else {
1213 false
1214 }
1215 } else {
1216 false
1217 }
1218 },
1219 Duration::from_secs(5),
1220 );
1221
1222 assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
1223 }
1224
1225 #[rstest]
1226 fn test_log_component_level_filtering() {
1227 let config =
1228 LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
1229
1230 let temp_dir = tempdir().expect("Failed to create temporary directory");
1231 let file_config = FileWriterConfig {
1232 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1233 ..Default::default()
1234 };
1235
1236 let log_guard = Logger::init_with_config(
1237 TraderId::from("TRADER-001"),
1238 UUID4::new(),
1239 config,
1240 file_config,
1241 );
1242
1243 logging_clock_set_static_mode();
1244 logging_clock_set_static_time(1_650_000_000_000_000);
1245
1246 log::info!(
1247 component = "RiskEngine";
1248 "This is a test."
1249 );
1250
1251 drop(log_guard); wait_until(
1254 || {
1255 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1256 .expect("Failed to read directory")
1257 .filter_map(Result::ok)
1258 .find(|entry| entry.path().is_file())
1259 {
1260 let log_file_path = log_file.path();
1261 let log_contents = std::fs::read_to_string(log_file_path)
1262 .expect("Error while reading log file");
1263 !log_contents.contains("RiskEngine")
1264 } else {
1265 false
1266 }
1267 },
1268 Duration::from_secs(3),
1269 );
1270
1271 assert!(
1272 std::fs::read_dir(&temp_dir)
1273 .expect("Failed to read directory")
1274 .filter_map(Result::ok)
1275 .any(|entry| entry.path().is_file()),
1276 "Log file exists"
1277 );
1278 }
1279
1280 #[rstest]
1281 fn test_logging_to_file_in_json_format() {
1282 let config =
1283 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1284 .unwrap();
1285
1286 let temp_dir = tempdir().expect("Failed to create temporary directory");
1287 let file_config = FileWriterConfig {
1288 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1289 file_format: Some("json".to_string()),
1290 ..Default::default()
1291 };
1292
1293 let log_guard = Logger::init_with_config(
1294 TraderId::from("TRADER-001"),
1295 UUID4::new(),
1296 config,
1297 file_config,
1298 );
1299
1300 logging_clock_set_static_mode();
1301 logging_clock_set_static_time(1_650_000_000_000_000);
1302
1303 log::info!(
1304 component = "RiskEngine";
1305 "This is a test."
1306 );
1307
1308 let mut log_contents = String::new();
1309
1310 drop(log_guard); wait_until(
1313 || {
1314 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1315 .expect("Failed to read directory")
1316 .filter_map(Result::ok)
1317 .find(|entry| entry.path().is_file())
1318 {
1319 let log_file_path = log_file.path();
1320 log_contents = std::fs::read_to_string(log_file_path)
1321 .expect("Error while reading log file");
1322 !log_contents.is_empty()
1323 } else {
1324 false
1325 }
1326 },
1327 Duration::from_secs(3),
1328 );
1329
1330 assert_eq!(
1331 log_contents,
1332 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
1333 );
1334 }
1335
1336 #[rstest]
1337 fn test_init_sets_logging_is_initialized_flag() {
1338 let config = LoggerConfig::default();
1339 let file_config = FileWriterConfig::default();
1340
1341 let guard = Logger::init_with_config(
1342 TraderId::from("TRADER-001"),
1343 UUID4::new(),
1344 config,
1345 file_config,
1346 );
1347 assert!(guard.is_ok());
1348 assert!(logging_is_initialized());
1349
1350 drop(guard);
1351 assert!(!logging_is_initialized());
1352 }
1353
1354 #[rstest]
1355 fn test_reinit_after_guard_drop_fails() {
1356 let config = LoggerConfig::default();
1357 let file_config = FileWriterConfig::default();
1358
1359 let guard1 = Logger::init_with_config(
1360 TraderId::from("TRADER-001"),
1361 UUID4::new(),
1362 config.clone(),
1363 file_config.clone(),
1364 );
1365 assert!(guard1.is_ok());
1366 drop(guard1);
1367
1368 let guard2 = Logger::init_with_config(
1370 TraderId::from("TRADER-002"),
1371 UUID4::new(),
1372 config,
1373 file_config,
1374 );
1375 assert!(guard2.is_err());
1376 }
1377
1378 #[rstest]
1379 fn test_bypass_before_init_prevents_logging() {
1380 logging_set_bypass();
1381 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1382
1383 let temp_dir = tempdir().expect("Failed to create temporary directory");
1384 let config = LoggerConfig {
1385 fileout_level: LevelFilter::Debug,
1386 ..Default::default()
1387 };
1388 let file_config = FileWriterConfig {
1389 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1390 ..Default::default()
1391 };
1392
1393 let guard = Logger::init_with_config(
1394 TraderId::from("TRADER-001"),
1395 UUID4::new(),
1396 config,
1397 file_config,
1398 );
1399 assert!(guard.is_ok());
1400
1401 log::info!(
1402 component = "TestComponent";
1403 "This should be bypassed"
1404 );
1405 std::thread::sleep(Duration::from_millis(100));
1406 drop(guard);
1407
1408 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1410 }
1411
1412 #[rstest]
1413 fn test_module_level_filtering() {
1414 let config = LoggerConfig::from_spec(
1418 "stdout=Off;fileout=Trace;nautilus::adapters=Warn;nautilus::adapters::okx=Debug",
1419 )
1420 .unwrap();
1421
1422 let temp_dir = tempdir().expect("Failed to create temporary directory");
1423 let file_config = FileWriterConfig {
1424 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1425 ..Default::default()
1426 };
1427
1428 let log_guard = Logger::init_with_config(
1429 TraderId::from("TRADER-MOD"),
1430 UUID4::new(),
1431 config,
1432 file_config,
1433 )
1434 .expect("Failed to initialize logger");
1435
1436 logging_clock_set_static_mode();
1437 logging_clock_set_static_time(1_650_000_000_000_000);
1438
1439 log::debug!(
1441 component = "nautilus::adapters::okx::websocket";
1442 "OKX debug message"
1443 );
1444
1445 log::info!(
1447 component = "nautilus::adapters::okx";
1448 "OKX info message"
1449 );
1450
1451 log::info!(
1453 component = "nautilus::adapters::binance";
1454 "Binance info message SHOULD NOT APPEAR"
1455 );
1456
1457 log::warn!(
1459 component = "nautilus::adapters::binance";
1460 "Binance warn message"
1461 );
1462
1463 log::trace!(
1465 component = "Portfolio";
1466 "Portfolio trace message"
1467 );
1468
1469 drop(log_guard);
1470
1471 wait_until(
1472 || {
1473 std::fs::read_dir(&temp_dir)
1474 .expect("Failed to read directory")
1475 .filter_map(Result::ok)
1476 .any(|entry| entry.path().is_file())
1477 },
1478 Duration::from_secs(3),
1479 );
1480
1481 let log_file_path = std::fs::read_dir(&temp_dir)
1482 .expect("Failed to read directory")
1483 .filter_map(Result::ok)
1484 .find(|entry| entry.path().is_file())
1485 .expect("No log file found")
1486 .path();
1487
1488 let log_contents =
1489 std::fs::read_to_string(log_file_path).expect("Error reading log file");
1490
1491 assert!(
1492 log_contents.contains("OKX debug message"),
1493 "OKX debug should pass (longer prefix wins)"
1494 );
1495 assert!(
1496 log_contents.contains("OKX info message"),
1497 "OKX info should pass"
1498 );
1499 assert!(
1500 log_contents.contains("Binance warn message"),
1501 "Binance warn should pass"
1502 );
1503 assert!(
1504 log_contents.contains("Portfolio trace message"),
1505 "Unfiltered component should pass"
1506 );
1507 assert!(
1508 !log_contents.contains("SHOULD NOT APPEAR"),
1509 "Binance info should be filtered (adapters=Warn)"
1510 );
1511 }
1512 }
1513}