1use std::{
17 fmt::Display,
18 sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
19};
20
21use ahash::AHashMap;
22use indexmap::IndexMap;
23use log::{
24 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
25 kv::{ToValue, Value},
26 set_boxed_logger, set_max_level,
27};
28use nautilus_core::{
29 UUID4, UnixNanos,
30 datetime::unix_nanos_to_iso8601,
31 time::{get_atomic_clock_realtime, get_atomic_clock_static},
32};
33use nautilus_model::identifiers::TraderId;
34use serde::{Deserialize, Serialize, Serializer};
35use ustr::Ustr;
36
37pub use super::config::LoggerConfig;
38use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
39use crate::{
40 enums::{LogColor, LogLevel},
41 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
42};
43
44const LOGGING: &str = "logging";
45const KV_COLOR: &str = "color";
46const KV_COMPONENT: &str = "component";
47
48static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
50
51static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
53
54#[derive(Debug)]
60pub struct Logger {
61 pub config: LoggerConfig,
63 tx: std::sync::mpsc::Sender<LogEvent>,
65}
66
67#[derive(Debug)]
69pub enum LogEvent {
70 Log(LogLine),
72 Flush,
74 Close,
76}
77
78#[derive(Clone, Debug, Serialize, Deserialize)]
80pub struct LogLine {
81 pub timestamp: UnixNanos,
83 pub level: Level,
85 pub color: LogColor,
87 pub component: Ustr,
89 pub message: String,
91}
92
93impl Display for LogLine {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
96 }
97}
98
99#[derive(Clone, Debug)]
106pub struct LogLineWrapper {
107 line: LogLine,
109 cache: Option<String>,
111 colored: Option<String>,
113 trader_id: Ustr,
115}
116
117impl LogLineWrapper {
118 #[must_use]
120 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
121 Self {
122 line,
123 cache: None,
124 colored: None,
125 trader_id,
126 }
127 }
128
129 pub fn get_string(&mut self) -> &str {
134 self.cache.get_or_insert_with(|| {
135 format!(
136 "{} [{}] {}.{}: {}\n",
137 unix_nanos_to_iso8601(self.line.timestamp),
138 self.line.level,
139 self.trader_id,
140 &self.line.component,
141 &self.line.message,
142 )
143 })
144 }
145
146 pub fn get_colored(&mut self) -> &str {
152 self.colored.get_or_insert_with(|| {
153 format!(
154 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
155 unix_nanos_to_iso8601(self.line.timestamp),
156 &self.line.color.as_ansi(),
157 self.line.level,
158 self.trader_id,
159 &self.line.component,
160 &self.line.message,
161 )
162 })
163 }
164
165 #[must_use]
174 pub fn get_json(&self) -> String {
175 let json_string =
176 serde_json::to_string(&self).expect("Error serializing log event to string");
177 format!("{json_string}\n")
178 }
179}
180
181impl Serialize for LogLineWrapper {
182 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
183 where
184 S: Serializer,
185 {
186 let mut json_obj = IndexMap::new();
187 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
188 json_obj.insert("timestamp".to_string(), timestamp);
189 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
190 json_obj.insert("level".to_string(), self.line.level.to_string());
191 json_obj.insert("color".to_string(), self.line.color.to_string());
192 json_obj.insert("component".to_string(), self.line.component.to_string());
193 json_obj.insert("message".to_string(), self.line.message.clone());
194
195 json_obj.serialize(serializer)
196 }
197}
198
199impl Log for Logger {
200 fn enabled(&self, metadata: &log::Metadata) -> bool {
201 !LOGGING_BYPASSED.load(Ordering::Relaxed)
202 && (metadata.level() == Level::Error
203 || metadata.level() <= self.config.stdout_level
204 || metadata.level() <= self.config.fileout_level)
205 }
206
207 fn log(&self, record: &log::Record) {
208 if self.enabled(record.metadata()) {
209 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
210 get_atomic_clock_realtime().get_time_ns()
211 } else {
212 get_atomic_clock_static().get_time_ns()
213 };
214 let level = record.level();
215 let key_values = record.key_values();
216 let color: LogColor = key_values
217 .get(KV_COLOR.into())
218 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
219 .unwrap_or(level.into());
220 let component = key_values.get(KV_COMPONENT.into()).map_or_else(
221 || Ustr::from(record.metadata().target()),
222 |v| Ustr::from(&v.to_string()),
223 );
224
225 let line = LogLine {
226 timestamp,
227 level,
228 color,
229 component,
230 message: format!("{}", record.args()),
231 };
232 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
233 eprintln!("Error sending log event (receiver closed): {line}");
234 }
235 }
236 }
237
238 fn flush(&self) {
239 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
241 return;
242 }
243
244 if let Err(e) = self.tx.send(LogEvent::Flush) {
245 eprintln!("Error sending flush log event: {e}");
246 }
247 }
248}
249
250#[allow(clippy::too_many_arguments)]
251impl Logger {
252 pub fn init_with_env(
258 trader_id: TraderId,
259 instance_id: UUID4,
260 file_config: FileWriterConfig,
261 ) -> anyhow::Result<LogGuard> {
262 let config = LoggerConfig::from_env()?;
263 Self::init_with_config(trader_id, instance_id, config, file_config)
264 }
265
266 pub fn init_with_config(
272 trader_id: TraderId,
273 instance_id: UUID4,
274 config: LoggerConfig,
275 file_config: FileWriterConfig,
276 ) -> anyhow::Result<LogGuard> {
277 if super::LOGGING_INITIALIZED.load(Ordering::SeqCst) {
279 return LogGuard::new()
280 .ok_or_else(|| anyhow::anyhow!("Logging already initialized but sender missing"));
281 }
282
283 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
284
285 let logger_tx = tx.clone();
286 let logger = Self {
287 tx: logger_tx,
288 config: config.clone(),
289 };
290
291 set_boxed_logger(Box::new(logger))?;
292
293 if LOGGER_TX.set(tx).is_err() {
295 debug_assert!(
296 false,
297 "LOGGER_TX already set - re-initialization not supported"
298 );
299 }
300
301 let is_colored = config.is_colored;
302
303 let print_config = config.print_config;
304 if print_config {
305 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
306 println!("Logger initialized with {config:?} {file_config:?}");
307 }
308
309 let handle = std::thread::Builder::new()
310 .name(LOGGING.to_string())
311 .spawn(move || {
312 Self::handle_messages(
313 trader_id.to_string(),
314 instance_id.to_string(),
315 config,
316 file_config,
317 rx,
318 );
319 })?;
320
321 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
323 debug_assert!(
324 handle_guard.is_none(),
325 "LOGGER_HANDLE already set - re-initialization not supported"
326 );
327 *handle_guard = Some(handle);
328 }
329
330 let max_level = log::LevelFilter::Trace;
331 set_max_level(max_level);
332
333 if print_config {
334 println!("Logger set as `log` implementation with max level {max_level}");
335 }
336
337 super::LOGGING_INITIALIZED.store(true, Ordering::SeqCst);
338 super::LOGGING_COLORED.store(is_colored, Ordering::SeqCst);
339
340 LogGuard::new()
341 .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
342 }
343
344 fn handle_messages(
345 trader_id: String,
346 instance_id: String,
347 config: LoggerConfig,
348 file_config: FileWriterConfig,
349 rx: std::sync::mpsc::Receiver<LogEvent>,
350 ) {
351 let LoggerConfig {
352 stdout_level,
353 fileout_level,
354 component_level,
355 module_level,
356 log_components_only,
357 is_colored,
358 print_config: _,
359 use_tracing: _,
360 } = config;
361
362 let mut module_filters_sorted: Vec<(Ustr, LevelFilter)> =
364 module_level.into_iter().collect();
365 module_filters_sorted.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
366
367 let trader_id_cache = Ustr::from(&trader_id);
368
369 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
371 let mut stderr_writer = StderrWriter::new(is_colored);
372
373 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
375 None
376 } else {
377 FileWriter::new(trader_id, instance_id, file_config, fileout_level)
378 };
379
380 let process_event = |event: LogEvent,
381 stdout_writer: &mut StdoutWriter,
382 stderr_writer: &mut StderrWriter,
383 file_writer_opt: &mut Option<FileWriter>| {
384 match event {
385 LogEvent::Log(line) => {
386 if should_filter_log(
387 &line.component,
388 line.level,
389 &module_filters_sorted,
390 &component_level,
391 log_components_only,
392 ) {
393 return;
394 }
395
396 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
397
398 if stderr_writer.enabled(&wrapper.line) {
399 if is_colored {
400 stderr_writer.write(wrapper.get_colored());
401 } else {
402 stderr_writer.write(wrapper.get_string());
403 }
404 }
405
406 if stdout_writer.enabled(&wrapper.line) {
407 if is_colored {
408 stdout_writer.write(wrapper.get_colored());
409 } else {
410 stdout_writer.write(wrapper.get_string());
411 }
412 }
413
414 if let Some(file_writer) = file_writer_opt
415 && file_writer.enabled(&wrapper.line)
416 {
417 if file_writer.json_format {
418 file_writer.write(&wrapper.get_json());
419 } else {
420 file_writer.write(wrapper.get_string());
421 }
422 }
423 }
424 LogEvent::Flush => {
425 stdout_writer.flush();
426 stderr_writer.flush();
427
428 if let Some(file_writer) = file_writer_opt {
429 file_writer.flush();
430 }
431 }
432 LogEvent::Close => {
433 }
435 }
436 };
437
438 while let Ok(event) = rx.recv() {
440 match event {
441 LogEvent::Log(_) | LogEvent::Flush => process_event(
442 event,
443 &mut stdout_writer,
444 &mut stderr_writer,
445 &mut file_writer_opt,
446 ),
447 LogEvent::Close => {
448 stdout_writer.flush();
450 stderr_writer.flush();
451
452 if let Some(ref mut file_writer) = file_writer_opt {
453 file_writer.flush();
454 }
455
456 while let Ok(evt) = rx.try_recv() {
459 match evt {
460 LogEvent::Close => (), _ => process_event(
462 evt,
463 &mut stdout_writer,
464 &mut stderr_writer,
465 &mut file_writer_opt,
466 ),
467 }
468 }
469
470 stdout_writer.flush();
472 stderr_writer.flush();
473
474 if let Some(ref mut file_writer) = file_writer_opt {
475 file_writer.flush();
476 }
477
478 break;
479 }
480 }
481 }
482 }
483}
484
485#[must_use]
492pub fn should_filter_log(
493 component: &Ustr,
494 line_level: log::Level,
495 module_filters_sorted: &[(Ustr, LevelFilter)],
496 component_level: &AHashMap<Ustr, LevelFilter>,
497 log_components_only: bool,
498) -> bool {
499 if module_filters_sorted.is_empty() && component_level.is_empty() {
500 return log_components_only;
501 }
502
503 let module_filter = module_filters_sorted
505 .iter()
506 .find(|(path, _)| component.starts_with(path.as_str()))
507 .map(|(_, level)| *level);
508
509 let component_filter = component_level.get(component).copied();
510
511 if log_components_only && module_filter.is_none() && component_filter.is_none() {
512 return true;
513 }
514
515 if let Some(filter_level) = module_filter.or(component_filter)
517 && line_level > filter_level
518 {
519 return true;
520 }
521
522 false
523}
524
525pub(crate) fn shutdown_graceful() {
534 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
536 log::set_max_level(log::LevelFilter::Off);
537
538 if let Some(tx) = LOGGER_TX.get() {
540 let _ = tx.send(LogEvent::Close);
541 }
542
543 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
544 && let Some(handle) = handle_guard.take()
545 && handle.thread().id() != std::thread::current().id()
546 {
547 let _ = handle.join();
548 }
549
550 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
551}
552
553pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
554 let color = Value::from(color as u8);
555
556 match level {
557 LogLevel::Off => {}
558 LogLevel::Trace => {
559 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
560 }
561 LogLevel::Debug => {
562 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
563 }
564 LogLevel::Info => {
565 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
566 }
567 LogLevel::Warning => {
568 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
569 }
570 LogLevel::Error => {
571 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
572 }
573 }
574}
575
576#[cfg_attr(
603 feature = "python",
604 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
605)]
606#[derive(Debug)]
607pub struct LogGuard {
608 tx: std::sync::mpsc::Sender<LogEvent>,
609}
610
611impl LogGuard {
612 #[must_use]
620 pub fn new() -> Option<Self> {
621 LOGGER_TX.get().map(|tx| {
622 LOGGING_GUARDS_ACTIVE
623 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
624 if count == u8::MAX {
625 None } else {
627 Some(count + 1)
628 }
629 })
630 .expect("Maximum number of active LogGuards (255) exceeded");
631
632 Self { tx: tx.clone() }
633 })
634 }
635}
636
637impl Drop for LogGuard {
638 fn drop(&mut self) {
643 let previous_count = LOGGING_GUARDS_ACTIVE
644 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
645 assert!(count != 0, "LogGuard reference count underflow");
646 Some(count - 1)
647 })
648 .expect("Failed to decrement LogGuard count");
649
650 if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
652 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
656
657 log::set_max_level(log::LevelFilter::Off);
659
660 let _ = self.tx.send(LogEvent::Close);
662
663 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
665 && let Some(handle) = handle_guard.take()
666 {
667 if handle.thread().id() != std::thread::current().id() {
669 let _ = handle.join();
670 }
671 }
672
673 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
675 } else {
676 let _ = self.tx.send(LogEvent::Flush);
678 }
679 }
680}
681
682#[cfg(test)]
683mod tests {
684 use std::time::Duration;
685
686 use ahash::AHashMap;
687 use log::LevelFilter;
688 use nautilus_core::UUID4;
689 use nautilus_model::identifiers::TraderId;
690 use rstest::*;
691 use serde_json::Value;
692 use tempfile::tempdir;
693 use ustr::Ustr;
694
695 use super::*;
696 use crate::{
697 enums::LogColor,
698 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
699 testing::wait_until,
700 };
701
702 #[rstest]
703 fn log_message_serialization() {
704 let log_message = LogLine {
705 timestamp: UnixNanos::default(),
706 level: log::Level::Info,
707 color: LogColor::Normal,
708 component: Ustr::from("Portfolio"),
709 message: "This is a log message".to_string(),
710 };
711
712 let serialized_json = serde_json::to_string(&log_message).unwrap();
713 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
714
715 assert_eq!(deserialized_value["level"], "INFO");
716 assert_eq!(deserialized_value["component"], "Portfolio");
717 assert_eq!(deserialized_value["message"], "This is a log message");
718 }
719
720 #[rstest]
721 fn log_config_parsing() {
722 let config =
723 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
724 .unwrap();
725 assert_eq!(
726 config,
727 LoggerConfig {
728 stdout_level: LevelFilter::Info,
729 fileout_level: LevelFilter::Debug,
730 component_level: AHashMap::from_iter(vec![(
731 Ustr::from("RiskEngine"),
732 LevelFilter::Error
733 )]),
734 module_level: AHashMap::new(),
735 log_components_only: false,
736 is_colored: true,
737 print_config: false,
738 use_tracing: false,
739 }
740 );
741 }
742
743 #[rstest]
744 fn log_config_parsing2() {
745 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
746 assert_eq!(
747 config,
748 LoggerConfig {
749 stdout_level: LevelFilter::Warn,
750 fileout_level: LevelFilter::Error,
751 component_level: AHashMap::new(),
752 module_level: AHashMap::new(),
753 log_components_only: false,
754 is_colored: true,
755 print_config: true,
756 use_tracing: false,
757 }
758 );
759 }
760
761 #[rstest]
762 fn log_config_parsing_with_log_components_only() {
763 let config =
764 LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
765 assert_eq!(
766 config,
767 LoggerConfig {
768 stdout_level: LevelFilter::Info,
769 fileout_level: LevelFilter::Off,
770 component_level: AHashMap::from_iter(vec![(
771 Ustr::from("RiskEngine"),
772 LevelFilter::Debug
773 )]),
774 module_level: AHashMap::new(),
775 log_components_only: true,
776 is_colored: true,
777 print_config: false,
778 use_tracing: false,
779 }
780 );
781 }
782
783 #[rstest]
784 fn test_log_line_wrapper_plain_string() {
785 let line = LogLine {
786 timestamp: 1_650_000_000_000_000_000.into(),
787 level: log::Level::Info,
788 color: LogColor::Normal,
789 component: Ustr::from("TestComponent"),
790 message: "Test message".to_string(),
791 };
792
793 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
794 let result = wrapper.get_string();
795
796 assert!(result.contains("TRADER-001"));
797 assert!(result.contains("TestComponent"));
798 assert!(result.contains("Test message"));
799 assert!(result.contains("[INFO]"));
800 assert!(result.ends_with('\n'));
801 assert!(!result.contains("\x1b["));
803 }
804
805 #[rstest]
806 fn test_log_line_wrapper_colored_string() {
807 let line = LogLine {
808 timestamp: 1_650_000_000_000_000_000.into(),
809 level: log::Level::Info,
810 color: LogColor::Green,
811 component: Ustr::from("TestComponent"),
812 message: "Test message".to_string(),
813 };
814
815 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
816 let result = wrapper.get_colored();
817
818 assert!(result.contains("TRADER-001"));
819 assert!(result.contains("TestComponent"));
820 assert!(result.contains("Test message"));
821 assert!(result.contains("\x1b["));
823 assert!(result.ends_with('\n'));
824 }
825
826 #[rstest]
827 fn test_log_line_wrapper_json_output() {
828 let line = LogLine {
829 timestamp: 1_650_000_000_000_000_000.into(),
830 level: log::Level::Warn,
831 color: LogColor::Yellow,
832 component: Ustr::from("RiskEngine"),
833 message: "Warning message".to_string(),
834 };
835
836 let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-002"));
837 let json = wrapper.get_json();
838
839 let parsed: Value = serde_json::from_str(json.trim()).unwrap();
840 assert_eq!(parsed["trader_id"], "TRADER-002");
841 assert_eq!(parsed["component"], "RiskEngine");
842 assert_eq!(parsed["message"], "Warning message");
843 assert_eq!(parsed["level"], "WARN");
844 assert_eq!(parsed["color"], "YELLOW");
845 }
846
847 #[rstest]
848 fn test_log_line_wrapper_caches_string() {
849 let line = LogLine {
850 timestamp: 1_650_000_000_000_000_000.into(),
851 level: log::Level::Info,
852 color: LogColor::Normal,
853 component: Ustr::from("Test"),
854 message: "Cached".to_string(),
855 };
856
857 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER"));
858 let first = wrapper.get_string().to_string();
859 let second = wrapper.get_string().to_string();
860
861 assert_eq!(first, second);
862 }
863
864 #[rstest]
865 fn test_log_line_display() {
866 let line = LogLine {
867 timestamp: 0.into(),
868 level: log::Level::Error,
869 color: LogColor::Red,
870 component: Ustr::from("Component"),
871 message: "Error occurred".to_string(),
872 };
873
874 let display = format!("{line}");
875 assert_eq!(display, "[ERROR] Component: Error occurred");
876 }
877
878 fn sorted_module_filters(map: AHashMap<Ustr, LevelFilter>) -> Vec<(Ustr, LevelFilter)> {
880 let mut v: Vec<_> = map.into_iter().collect();
881 v.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
882 v
883 }
884
885 #[rstest]
886 fn test_filter_no_filters_passes_all() {
887 let module_filters = vec![];
888 let component_level = AHashMap::new();
889
890 assert!(!should_filter_log(
891 &Ustr::from("anything"),
892 Level::Trace,
893 &module_filters,
894 &component_level,
895 false
896 ));
897 }
898
899 #[rstest]
900 fn test_filter_component_exact_match() {
901 let module_filters = vec![];
902 let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Error)]);
903
904 assert!(should_filter_log(
905 &Ustr::from("RiskEngine"),
906 Level::Info,
907 &module_filters,
908 &component_level,
909 false
910 ));
911 assert!(!should_filter_log(
912 &Ustr::from("RiskEngine"),
913 Level::Error,
914 &module_filters,
915 &component_level,
916 false
917 ));
918 assert!(!should_filter_log(
919 &Ustr::from("Portfolio"),
920 Level::Info,
921 &module_filters,
922 &component_level,
923 false
924 ));
925 }
926
927 #[rstest]
928 fn test_filter_module_prefix_match() {
929 let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
930 let component_level = AHashMap::new();
931
932 assert!(!should_filter_log(
933 &Ustr::from("nautilus_okx::websocket"),
934 Level::Debug,
935 &module_filters,
936 &component_level,
937 false
938 ));
939 assert!(!should_filter_log(
940 &Ustr::from("nautilus_okx::websocket::handler"),
941 Level::Debug,
942 &module_filters,
943 &component_level,
944 false
945 ));
946 assert!(should_filter_log(
947 &Ustr::from("nautilus_okx::websocket::handler"),
948 Level::Trace,
949 &module_filters,
950 &component_level,
951 false
952 ));
953 assert!(!should_filter_log(
954 &Ustr::from("nautilus_binance::data"),
955 Level::Trace,
956 &module_filters,
957 &component_level,
958 false
959 ));
960 }
961
962 #[rstest]
963 fn test_filter_longest_prefix_wins() {
964 let module_filters = sorted_module_filters(AHashMap::from_iter([
965 (Ustr::from("nautilus_okx"), LevelFilter::Error),
966 (Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug),
967 ]));
968 let component_level = AHashMap::new();
969
970 assert!(!should_filter_log(
971 &Ustr::from("nautilus_okx::websocket::handler"),
972 Level::Debug,
973 &module_filters,
974 &component_level,
975 false
976 ));
977 assert!(should_filter_log(
978 &Ustr::from("nautilus_okx::data"),
979 Level::Debug,
980 &module_filters,
981 &component_level,
982 false
983 ));
984 assert!(!should_filter_log(
985 &Ustr::from("nautilus_okx::data"),
986 Level::Error,
987 &module_filters,
988 &component_level,
989 false
990 ));
991 }
992
993 #[rstest]
994 fn test_filter_module_precedence_over_component() {
995 let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
996 let component_level =
997 AHashMap::from_iter([(Ustr::from("nautilus_okx::websocket"), LevelFilter::Error)]);
998
999 assert!(!should_filter_log(
1000 &Ustr::from("nautilus_okx::websocket"),
1001 Level::Debug,
1002 &module_filters,
1003 &component_level,
1004 false
1005 ));
1006 }
1007
1008 #[rstest]
1009 fn test_filter_log_components_only_blocks_unknown() {
1010 let module_filters = vec![];
1011 let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Debug)]);
1012
1013 assert!(should_filter_log(
1014 &Ustr::from("Portfolio"),
1015 Level::Info,
1016 &module_filters,
1017 &component_level,
1018 true
1019 ));
1020 assert!(!should_filter_log(
1021 &Ustr::from("RiskEngine"),
1022 Level::Info,
1023 &module_filters,
1024 &component_level,
1025 true
1026 ));
1027 }
1028
1029 #[rstest]
1030 fn test_filter_log_components_only_with_module() {
1031 let module_filters = vec![(Ustr::from("nautilus_okx"), LevelFilter::Debug)];
1032 let component_level = AHashMap::new();
1033
1034 assert!(!should_filter_log(
1035 &Ustr::from("nautilus_okx::websocket"),
1036 Level::Debug,
1037 &module_filters,
1038 &component_level,
1039 true
1040 ));
1041 assert!(should_filter_log(
1042 &Ustr::from("nautilus_binance::data"),
1043 Level::Debug,
1044 &module_filters,
1045 &component_level,
1046 true
1047 ));
1048 }
1049
1050 #[rstest]
1051 fn test_filter_level_comparison() {
1052 let module_filters = vec![];
1053 let component_level = AHashMap::from_iter([(Ustr::from("Test"), LevelFilter::Warn)]);
1054
1055 assert!(!should_filter_log(
1056 &Ustr::from("Test"),
1057 Level::Error,
1058 &module_filters,
1059 &component_level,
1060 false
1061 ));
1062 assert!(!should_filter_log(
1063 &Ustr::from("Test"),
1064 Level::Warn,
1065 &module_filters,
1066 &component_level,
1067 false
1068 ));
1069 assert!(should_filter_log(
1070 &Ustr::from("Test"),
1071 Level::Info,
1072 &module_filters,
1073 &component_level,
1074 false
1075 ));
1076 assert!(should_filter_log(
1077 &Ustr::from("Test"),
1078 Level::Debug,
1079 &module_filters,
1080 &component_level,
1081 false
1082 ));
1083 assert!(should_filter_log(
1084 &Ustr::from("Test"),
1085 Level::Trace,
1086 &module_filters,
1087 &component_level,
1088 false
1089 ));
1090 }
1091
1092 mod serial_tests {
1095 use std::sync::atomic::Ordering;
1096
1097 use super::*;
1098 use crate::logging::{LOGGING_BYPASSED, logging_is_initialized, logging_set_bypass};
1099
1100 #[rstest]
1101 fn test_logging_to_file() {
1102 let config = LoggerConfig {
1103 fileout_level: LevelFilter::Debug,
1104 ..Default::default()
1105 };
1106
1107 let temp_dir = tempdir().expect("Failed to create temporary directory");
1108 let file_config = FileWriterConfig {
1109 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1110 ..Default::default()
1111 };
1112
1113 let log_guard = Logger::init_with_config(
1114 TraderId::from("TRADER-001"),
1115 UUID4::new(),
1116 config,
1117 file_config,
1118 );
1119
1120 logging_clock_set_static_mode();
1121 logging_clock_set_static_time(1_650_000_000_000_000);
1122
1123 log::info!(
1124 component = "RiskEngine";
1125 "This is a test"
1126 );
1127
1128 let mut log_contents = String::new();
1129
1130 wait_until(
1131 || {
1132 std::fs::read_dir(&temp_dir)
1133 .expect("Failed to read directory")
1134 .filter_map(Result::ok)
1135 .any(|entry| entry.path().is_file())
1136 },
1137 Duration::from_secs(3),
1138 );
1139
1140 drop(log_guard); wait_until(
1143 || {
1144 let log_file_path = std::fs::read_dir(&temp_dir)
1145 .expect("Failed to read directory")
1146 .filter_map(Result::ok)
1147 .find(|entry| entry.path().is_file())
1148 .expect("No files found in directory")
1149 .path();
1150 log_contents = std::fs::read_to_string(log_file_path)
1151 .expect("Error while reading log file");
1152 !log_contents.is_empty()
1153 },
1154 Duration::from_secs(3),
1155 );
1156
1157 assert_eq!(
1158 log_contents,
1159 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test\n"
1160 );
1161 }
1162
1163 #[rstest]
1164 fn test_shutdown_drains_backlog_tail() {
1165 const N: usize = 1000;
1166
1167 let config = LoggerConfig {
1169 stdout_level: LevelFilter::Off,
1170 fileout_level: LevelFilter::Info,
1171 ..Default::default()
1172 };
1173
1174 let temp_dir = tempdir().expect("Failed to create temporary directory");
1175 let file_config = FileWriterConfig {
1176 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1177 ..Default::default()
1178 };
1179
1180 let log_guard = Logger::init_with_config(
1181 TraderId::from("TRADER-TAIL"),
1182 UUID4::new(),
1183 config,
1184 file_config,
1185 )
1186 .expect("Failed to initialize logger");
1187
1188 logging_clock_set_static_mode();
1190 logging_clock_set_static_time(1_700_000_000_000_000);
1191
1192 for i in 0..N {
1194 log::info!(component = "TailDrain"; "BacklogTest {i}");
1195 }
1196
1197 drop(log_guard);
1199
1200 let mut count = 0usize;
1202 wait_until(
1203 || {
1204 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1205 .expect("Failed to read directory")
1206 .filter_map(Result::ok)
1207 .find(|entry| entry.path().is_file())
1208 {
1209 let log_file_path = log_file.path();
1210 if let Ok(contents) = std::fs::read_to_string(log_file_path) {
1211 count = contents
1212 .lines()
1213 .filter(|l| l.contains("BacklogTest "))
1214 .count();
1215 count >= N
1216 } else {
1217 false
1218 }
1219 } else {
1220 false
1221 }
1222 },
1223 Duration::from_secs(5),
1224 );
1225
1226 assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
1227 }
1228
1229 #[rstest]
1230 fn test_log_component_level_filtering() {
1231 let config =
1232 LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
1233
1234 let temp_dir = tempdir().expect("Failed to create temporary directory");
1235 let file_config = FileWriterConfig {
1236 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1237 ..Default::default()
1238 };
1239
1240 let log_guard = Logger::init_with_config(
1241 TraderId::from("TRADER-001"),
1242 UUID4::new(),
1243 config,
1244 file_config,
1245 );
1246
1247 logging_clock_set_static_mode();
1248 logging_clock_set_static_time(1_650_000_000_000_000);
1249
1250 log::info!(
1251 component = "RiskEngine";
1252 "This is a test"
1253 );
1254
1255 drop(log_guard); wait_until(
1258 || {
1259 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1260 .expect("Failed to read directory")
1261 .filter_map(Result::ok)
1262 .find(|entry| entry.path().is_file())
1263 {
1264 let log_file_path = log_file.path();
1265 let log_contents = std::fs::read_to_string(log_file_path)
1266 .expect("Error while reading log file");
1267 !log_contents.contains("RiskEngine")
1268 } else {
1269 false
1270 }
1271 },
1272 Duration::from_secs(3),
1273 );
1274
1275 assert!(
1276 std::fs::read_dir(&temp_dir)
1277 .expect("Failed to read directory")
1278 .filter_map(Result::ok)
1279 .any(|entry| entry.path().is_file()),
1280 "Log file exists"
1281 );
1282 }
1283
1284 #[rstest]
1285 fn test_logging_to_file_in_json_format() {
1286 let config =
1287 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1288 .unwrap();
1289
1290 let temp_dir = tempdir().expect("Failed to create temporary directory");
1291 let file_config = FileWriterConfig {
1292 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1293 file_format: Some("json".to_string()),
1294 ..Default::default()
1295 };
1296
1297 let log_guard = Logger::init_with_config(
1298 TraderId::from("TRADER-001"),
1299 UUID4::new(),
1300 config,
1301 file_config,
1302 );
1303
1304 logging_clock_set_static_mode();
1305 logging_clock_set_static_time(1_650_000_000_000_000);
1306
1307 log::info!(
1308 component = "RiskEngine";
1309 "This is a test"
1310 );
1311
1312 let mut log_contents = String::new();
1313
1314 drop(log_guard); wait_until(
1317 || {
1318 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1319 .expect("Failed to read directory")
1320 .filter_map(Result::ok)
1321 .find(|entry| entry.path().is_file())
1322 {
1323 let log_file_path = log_file.path();
1324 log_contents = std::fs::read_to_string(log_file_path)
1325 .expect("Error while reading log file");
1326 !log_contents.is_empty()
1327 } else {
1328 false
1329 }
1330 },
1331 Duration::from_secs(3),
1332 );
1333
1334 assert_eq!(
1335 log_contents,
1336 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test\"}\n"
1337 );
1338 }
1339
1340 #[rstest]
1341 fn test_init_sets_logging_is_initialized_flag() {
1342 let config = LoggerConfig::default();
1343 let file_config = FileWriterConfig::default();
1344
1345 let guard = Logger::init_with_config(
1346 TraderId::from("TRADER-001"),
1347 UUID4::new(),
1348 config,
1349 file_config,
1350 );
1351 assert!(guard.is_ok());
1352 assert!(logging_is_initialized());
1353
1354 drop(guard);
1355 assert!(!logging_is_initialized());
1356 }
1357
1358 #[rstest]
1359 fn test_reinit_after_guard_drop_fails() {
1360 let config = LoggerConfig::default();
1361 let file_config = FileWriterConfig::default();
1362
1363 let guard1 = Logger::init_with_config(
1364 TraderId::from("TRADER-001"),
1365 UUID4::new(),
1366 config.clone(),
1367 file_config.clone(),
1368 );
1369 assert!(guard1.is_ok());
1370 drop(guard1);
1371
1372 let guard2 = Logger::init_with_config(
1374 TraderId::from("TRADER-002"),
1375 UUID4::new(),
1376 config,
1377 file_config,
1378 );
1379 assert!(guard2.is_err());
1380 }
1381
1382 #[rstest]
1383 fn test_bypass_before_init_prevents_logging() {
1384 logging_set_bypass();
1385 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1386
1387 let temp_dir = tempdir().expect("Failed to create temporary directory");
1388 let config = LoggerConfig {
1389 fileout_level: LevelFilter::Debug,
1390 ..Default::default()
1391 };
1392 let file_config = FileWriterConfig {
1393 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1394 ..Default::default()
1395 };
1396
1397 let guard = Logger::init_with_config(
1398 TraderId::from("TRADER-001"),
1399 UUID4::new(),
1400 config,
1401 file_config,
1402 );
1403 assert!(guard.is_ok());
1404
1405 log::info!(
1406 component = "TestComponent";
1407 "This should be bypassed"
1408 );
1409 std::thread::sleep(Duration::from_millis(100));
1410 drop(guard);
1411
1412 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1414 }
1415
1416 #[rstest]
1417 fn test_module_level_filtering() {
1418 let config = LoggerConfig::from_spec(
1422 "stdout=Off;fileout=Trace;nautilus::adapters=Warn;nautilus::adapters::okx=Debug",
1423 )
1424 .unwrap();
1425
1426 let temp_dir = tempdir().expect("Failed to create temporary directory");
1427 let file_config = FileWriterConfig {
1428 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1429 ..Default::default()
1430 };
1431
1432 let log_guard = Logger::init_with_config(
1433 TraderId::from("TRADER-MOD"),
1434 UUID4::new(),
1435 config,
1436 file_config,
1437 )
1438 .expect("Failed to initialize logger");
1439
1440 logging_clock_set_static_mode();
1441 logging_clock_set_static_time(1_650_000_000_000_000);
1442
1443 log::debug!(
1445 component = "nautilus::adapters::okx::websocket";
1446 "OKX debug message"
1447 );
1448
1449 log::info!(
1451 component = "nautilus::adapters::okx";
1452 "OKX info message"
1453 );
1454
1455 log::info!(
1457 component = "nautilus::adapters::binance";
1458 "Binance info message SHOULD NOT APPEAR"
1459 );
1460
1461 log::warn!(
1463 component = "nautilus::adapters::binance";
1464 "Binance warn message"
1465 );
1466
1467 log::trace!(
1469 component = "Portfolio";
1470 "Portfolio trace message"
1471 );
1472
1473 drop(log_guard);
1474
1475 wait_until(
1476 || {
1477 std::fs::read_dir(&temp_dir)
1478 .expect("Failed to read directory")
1479 .filter_map(Result::ok)
1480 .any(|entry| entry.path().is_file())
1481 },
1482 Duration::from_secs(3),
1483 );
1484
1485 let log_file_path = std::fs::read_dir(&temp_dir)
1486 .expect("Failed to read directory")
1487 .filter_map(Result::ok)
1488 .find(|entry| entry.path().is_file())
1489 .expect("No log file found")
1490 .path();
1491
1492 let log_contents =
1493 std::fs::read_to_string(log_file_path).expect("Error reading log file");
1494
1495 assert!(
1496 log_contents.contains("OKX debug message"),
1497 "OKX debug should pass (longer prefix wins)"
1498 );
1499 assert!(
1500 log_contents.contains("OKX info message"),
1501 "OKX info should pass"
1502 );
1503 assert!(
1504 log_contents.contains("Binance warn message"),
1505 "Binance warn should pass"
1506 );
1507 assert!(
1508 log_contents.contains("Portfolio trace message"),
1509 "Unfiltered component should pass"
1510 );
1511 assert!(
1512 !log_contents.contains("SHOULD NOT APPEAR"),
1513 "Binance info should be filtered (adapters=Warn)"
1514 );
1515 }
1516 }
1517}