1use std::{
17 collections::HashMap,
18 env,
19 fmt::Display,
20 str::FromStr,
21 sync::{atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26 kv::{ToValue, Value},
27 set_boxed_logger, set_max_level, Level, LevelFilter, Log, STATIC_MAX_LEVEL,
28};
29use nautilus_core::{
30 datetime::unix_nanos_to_iso8601,
31 time::{get_atomic_clock_realtime, get_atomic_clock_static},
32 UnixNanos, UUID4,
33};
34use nautilus_model::identifiers::TraderId;
35use serde::{Deserialize, Serialize, Serializer};
36use ustr::Ustr;
37
38use super::{LOGGING_BYPASSED, LOGGING_REALTIME};
39use crate::{
40 enums::{LogColor, LogLevel},
41 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
42};
43
44const LOGGING: &str = "logging";
45
46#[cfg_attr(
47 feature = "python",
48 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
49)]
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct LoggerConfig {
52 pub stdout_level: LevelFilter,
54 pub fileout_level: LevelFilter,
56 component_level: HashMap<Ustr, LevelFilter>,
58 pub is_colored: bool,
60 pub print_config: bool,
62}
63
64impl Default for LoggerConfig {
65 fn default() -> Self {
67 Self {
68 stdout_level: LevelFilter::Info,
69 fileout_level: LevelFilter::Off,
70 component_level: HashMap::new(),
71 is_colored: false,
72 print_config: false,
73 }
74 }
75}
76
77impl LoggerConfig {
78 #[must_use]
80 pub const fn new(
81 stdout_level: LevelFilter,
82 fileout_level: LevelFilter,
83 component_level: HashMap<Ustr, LevelFilter>,
84 is_colored: bool,
85 print_config: bool,
86 ) -> Self {
87 Self {
88 stdout_level,
89 fileout_level,
90 component_level,
91 is_colored,
92 print_config,
93 }
94 }
95
96 #[must_use]
97 pub fn from_spec(spec: &str) -> Self {
98 let Self {
99 mut stdout_level,
100 mut fileout_level,
101 mut component_level,
102 mut is_colored,
103 mut print_config,
104 } = Self::default();
105 spec.split(';').for_each(|kv| {
106 if kv == "is_colored" {
107 is_colored = true;
108 } else if kv == "print_config" {
109 print_config = true;
110 } else {
111 let mut kv = kv.split('=');
112 if let (Some(k), Some(Ok(lvl))) = (kv.next(), kv.next().map(LevelFilter::from_str))
113 {
114 if k == "stdout" {
115 stdout_level = lvl;
116 } else if k == "fileout" {
117 fileout_level = lvl;
118 } else {
119 component_level.insert(Ustr::from(k), lvl);
120 }
121 }
122 }
123 });
124
125 Self {
126 stdout_level,
127 fileout_level,
128 component_level,
129 is_colored,
130 print_config,
131 }
132 }
133
134 #[must_use]
135 pub fn from_env() -> Self {
136 match env::var("NAUTILUS_LOG") {
137 Ok(spec) => Self::from_spec(&spec),
138 Err(e) => panic!("Error parsing `LoggerConfig` spec: {e}"),
139 }
140 }
141}
142
143#[derive(Debug)]
149pub struct Logger {
150 pub config: LoggerConfig,
152 tx: std::sync::mpsc::Sender<LogEvent>,
154}
155
156pub enum LogEvent {
158 Log(LogLine),
160 Flush,
162}
163
164#[derive(Clone, Debug, Serialize, Deserialize)]
166pub struct LogLine {
167 pub level: Level,
169 pub color: LogColor,
171 pub component: Ustr,
173 pub message: String,
175}
176
177impl Display for LogLine {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
180 }
181}
182
183pub struct LogLineWrapper {
190 line: LogLine,
192 cache: Option<String>,
194 colored: Option<String>,
196 timestamp: String,
198 trader_id: Ustr,
200}
201
202impl LogLineWrapper {
203 #[must_use]
205 pub fn new(line: LogLine, trader_id: Ustr, timestamp: UnixNanos) -> Self {
206 Self {
207 line,
208 cache: None,
209 colored: None,
210 timestamp: unix_nanos_to_iso8601(timestamp),
211 trader_id,
212 }
213 }
214
215 pub fn get_string(&mut self) -> &str {
220 self.cache.get_or_insert_with(|| {
221 format!(
222 "{} [{}] {}.{}: {}\n",
223 self.timestamp,
224 self.line.level,
225 self.trader_id,
226 &self.line.component,
227 &self.line.message,
228 )
229 })
230 }
231
232 pub fn get_colored(&mut self) -> &str {
238 self.colored.get_or_insert_with(|| {
239 format!(
240 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
241 self.timestamp,
242 &self.line.color.as_ansi(),
243 self.line.level,
244 self.trader_id,
245 &self.line.component,
246 &self.line.message,
247 )
248 })
249 }
250
251 #[must_use]
257 pub fn get_json(&self) -> String {
258 let json_string =
259 serde_json::to_string(&self).expect("Error serializing log event to string");
260 format!("{json_string}\n")
261 }
262}
263
264impl Serialize for LogLineWrapper {
265 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
266 where
267 S: Serializer,
268 {
269 let mut json_obj = IndexMap::new();
270 json_obj.insert("timestamp".to_string(), self.timestamp.clone());
271 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
272 json_obj.insert("level".to_string(), self.line.level.to_string());
273 json_obj.insert("color".to_string(), self.line.color.to_string());
274 json_obj.insert("component".to_string(), self.line.component.to_string());
275 json_obj.insert("message".to_string(), self.line.message.to_string());
276
277 json_obj.serialize(serializer)
278 }
279}
280
281impl Log for Logger {
282 fn enabled(&self, metadata: &log::Metadata) -> bool {
283 !LOGGING_BYPASSED.load(Ordering::Relaxed)
284 && (metadata.level() == Level::Error
285 || metadata.level() <= self.config.stdout_level
286 || metadata.level() <= self.config.fileout_level)
287 }
288
289 fn log(&self, record: &log::Record) {
290 if self.enabled(record.metadata()) {
291 let key_values = record.key_values();
292 let color = key_values
293 .get("color".into())
294 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
295 .unwrap_or(LogColor::Normal);
296 let component = key_values.get("component".into()).map_or_else(
297 || Ustr::from(record.metadata().target()),
298 |v| Ustr::from(&v.to_string()),
299 );
300
301 let line = LogLine {
302 level: record.level(),
303 color,
304 component,
305 message: format!("{}", record.args()),
306 };
307 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
308 eprintln!("Error sending log event (receiver closed): {line}");
309 }
310 }
311 }
312
313 fn flush(&self) {
314 if let Err(e) = self.tx.send(LogEvent::Flush) {
315 eprintln!("Error sending flush log event (receiver closed): {e}");
316 }
317 }
318}
319
320#[allow(clippy::too_many_arguments)]
321impl Logger {
322 #[must_use]
323 pub fn init_with_env(
324 trader_id: TraderId,
325 instance_id: UUID4,
326 file_config: FileWriterConfig,
327 ) -> LogGuard {
328 let config = LoggerConfig::from_env();
329 Self::init_with_config(trader_id, instance_id, config, file_config)
330 }
331
332 #[must_use]
342 pub fn init_with_config(
343 trader_id: TraderId,
344 instance_id: UUID4,
345 config: LoggerConfig,
346 file_config: FileWriterConfig,
347 ) -> LogGuard {
348 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
349
350 let logger = Self {
351 tx,
352 config: config.clone(),
353 };
354
355 let print_config = config.print_config;
356 if print_config {
357 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
358 println!("Logger initialized with {config:?} {file_config:?}");
359 }
360
361 let mut handle: Option<std::thread::JoinHandle<()>> = None;
362 match set_boxed_logger(Box::new(logger)) {
363 Ok(()) => {
364 handle = Some(
365 std::thread::Builder::new()
366 .name(LOGGING.to_string())
367 .spawn(move || {
368 Self::handle_messages(
369 trader_id.to_string(),
370 instance_id.to_string(),
371 config,
372 file_config,
373 rx,
374 );
375 })
376 .expect("Error spawning thread '{LOGGING}'"),
377 );
378
379 let max_level = log::LevelFilter::Trace;
380 set_max_level(max_level);
381 if print_config {
382 println!("Logger set as `log` implementation with max level {max_level}");
383 }
384 }
385 Err(e) => {
386 eprintln!("Cannot set logger because of error: {e}");
387 }
388 }
389
390 LogGuard::new(handle)
391 }
392
393 fn handle_messages(
394 trader_id: String,
395 instance_id: String,
396 config: LoggerConfig,
397 file_config: FileWriterConfig,
398 rx: std::sync::mpsc::Receiver<LogEvent>,
399 ) {
400 let LoggerConfig {
401 stdout_level,
402 fileout_level,
403 ref component_level,
404 is_colored,
405 print_config: _,
406 } = config;
407
408 let trader_id_cache = Ustr::from(&trader_id);
409
410 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
412 let mut stderr_writer = StderrWriter::new(is_colored);
413
414 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
416 None
417 } else {
418 FileWriter::new(trader_id, instance_id, file_config, fileout_level)
419 };
420
421 while let Ok(event) = rx.recv() {
423 match event {
424 LogEvent::Flush => {
425 break;
426 }
427 LogEvent::Log(line) => {
428 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
429 get_atomic_clock_realtime().get_time_ns()
430 } else {
431 get_atomic_clock_static().get_time_ns()
432 };
433
434 let component_level = component_level.get(&line.component);
435
436 if let Some(&filter_level) = component_level {
439 if line.level > filter_level {
440 continue;
441 }
442 }
443
444 let mut wrapper = LogLineWrapper::new(line, trader_id_cache, timestamp);
445
446 if stderr_writer.enabled(&wrapper.line) {
447 if is_colored {
448 stderr_writer.write(wrapper.get_colored());
449 } else {
450 stderr_writer.write(wrapper.get_string());
451 }
452 }
453
454 if stdout_writer.enabled(&wrapper.line) {
455 if is_colored {
456 stdout_writer.write(wrapper.get_colored());
457 } else {
458 stdout_writer.write(wrapper.get_string());
459 }
460 }
461
462 if let Some(ref mut writer) = file_writer_opt {
463 if writer.enabled(&wrapper.line) {
464 if writer.json_format {
465 writer.write(&wrapper.get_json());
466 } else {
467 writer.write(wrapper.get_string());
468 }
469 }
470 }
471 }
472 }
473 }
474 }
475}
476
477pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
478 let color = Value::from(color as u8);
479
480 match level {
481 LogLevel::Off => {}
482 LogLevel::Trace => {
483 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
484 }
485 LogLevel::Debug => {
486 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
487 }
488 LogLevel::Info => {
489 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
490 }
491 LogLevel::Warning => {
492 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
493 }
494 LogLevel::Error => {
495 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
496 }
497 }
498}
499
500#[cfg_attr(
501 feature = "python",
502 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
503)]
504#[derive(Debug)]
505pub struct LogGuard {
506 handle: Option<std::thread::JoinHandle<()>>,
507}
508
509impl LogGuard {
510 #[must_use]
512 pub const fn new(handle: Option<std::thread::JoinHandle<()>>) -> Self {
513 Self { handle }
514 }
515}
516
517impl Default for LogGuard {
518 fn default() -> Self {
520 Self::new(None)
521 }
522}
523
524impl Drop for LogGuard {
525 fn drop(&mut self) {
526 log::logger().flush();
527 if let Some(handle) = self.handle.take() {
528 handle.join().expect("Error joining logging handle");
529 }
530 }
531}
532
533#[cfg(test)]
537mod tests {
538 use std::{collections::HashMap, time::Duration};
539
540 use log::LevelFilter;
541 use nautilus_core::UUID4;
542 use nautilus_model::identifiers::TraderId;
543 use rstest::*;
544 use serde_json::Value;
545 use tempfile::tempdir;
546 use ustr::Ustr;
547
548 use super::*;
549 use crate::{
550 enums::LogColor,
551 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
552 testing::wait_until,
553 };
554
555 #[rstest]
556 fn log_message_serialization() {
557 let log_message = LogLine {
558 level: log::Level::Info,
559 color: LogColor::Normal,
560 component: Ustr::from("Portfolio"),
561 message: "This is a log message".to_string(),
562 };
563
564 let serialized_json = serde_json::to_string(&log_message).unwrap();
565 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
566
567 assert_eq!(deserialized_value["level"], "INFO");
568 assert_eq!(deserialized_value["component"], "Portfolio");
569 assert_eq!(deserialized_value["message"], "This is a log message");
570 }
571
572 #[rstest]
573 fn log_config_parsing() {
574 let config =
575 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error");
576 assert_eq!(
577 config,
578 LoggerConfig {
579 stdout_level: LevelFilter::Info,
580 fileout_level: LevelFilter::Debug,
581 component_level: HashMap::from_iter(vec![(
582 Ustr::from("RiskEngine"),
583 LevelFilter::Error
584 )]),
585 is_colored: true,
586 print_config: false,
587 }
588 );
589 }
590
591 #[rstest]
592 fn log_config_parsing2() {
593 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;");
594 assert_eq!(
595 config,
596 LoggerConfig {
597 stdout_level: LevelFilter::Warn,
598 fileout_level: LevelFilter::Error,
599 component_level: HashMap::new(),
600 is_colored: false,
601 print_config: true,
602 }
603 );
604 }
605
606 #[rstest]
607 fn test_logging_to_file() {
608 let config = LoggerConfig {
609 fileout_level: LevelFilter::Debug,
610 ..Default::default()
611 };
612
613 let temp_dir = tempdir().expect("Failed to create temporary directory");
614 let file_config = FileWriterConfig {
615 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
616 ..Default::default()
617 };
618
619 let log_guard = Logger::init_with_config(
620 TraderId::from("TRADER-001"),
621 UUID4::new(),
622 config,
623 file_config,
624 );
625
626 logging_clock_set_static_mode();
627 logging_clock_set_static_time(1_650_000_000_000_000);
628
629 log::info!(
630 component = "RiskEngine";
631 "This is a test."
632 );
633
634 let mut log_contents = String::new();
635
636 wait_until(
637 || {
638 std::fs::read_dir(&temp_dir)
639 .expect("Failed to read directory")
640 .filter_map(Result::ok)
641 .any(|entry| entry.path().is_file())
642 },
643 Duration::from_secs(2),
644 );
645
646 drop(log_guard); wait_until(
649 || {
650 let log_file_path = std::fs::read_dir(&temp_dir)
651 .expect("Failed to read directory")
652 .filter_map(Result::ok)
653 .find(|entry| entry.path().is_file())
654 .expect("No files found in directory")
655 .path();
656 dbg!(&log_file_path);
657 log_contents =
658 std::fs::read_to_string(log_file_path).expect("Error while reading log file");
659 !log_contents.is_empty()
660 },
661 Duration::from_secs(2),
662 );
663
664 assert_eq!(
665 log_contents,
666 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
667 );
668 }
669
670 #[rstest]
671 fn test_log_component_level_filtering() {
672 let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error");
673
674 let temp_dir = tempdir().expect("Failed to create temporary directory");
675 let file_config = FileWriterConfig {
676 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
677 ..Default::default()
678 };
679
680 let log_guard = Logger::init_with_config(
681 TraderId::from("TRADER-001"),
682 UUID4::new(),
683 config,
684 file_config,
685 );
686
687 logging_clock_set_static_mode();
688 logging_clock_set_static_time(1_650_000_000_000_000);
689
690 log::info!(
691 component = "RiskEngine";
692 "This is a test."
693 );
694
695 drop(log_guard); wait_until(
698 || {
699 if let Some(log_file) = std::fs::read_dir(&temp_dir)
700 .expect("Failed to read directory")
701 .filter_map(Result::ok)
702 .find(|entry| entry.path().is_file())
703 {
704 let log_file_path = log_file.path();
705 let log_contents = std::fs::read_to_string(log_file_path)
706 .expect("Error while reading log file");
707 !log_contents.contains("RiskEngine")
708 } else {
709 false
710 }
711 },
712 Duration::from_secs(3),
713 );
714
715 assert!(
716 std::fs::read_dir(&temp_dir)
717 .expect("Failed to read directory")
718 .filter_map(Result::ok)
719 .any(|entry| entry.path().is_file()),
720 "Log file exists"
721 );
722 }
723
724 #[rstest]
725 fn test_logging_to_file_in_json_format() {
726 let config =
727 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info");
728
729 let temp_dir = tempdir().expect("Failed to create temporary directory");
730 let file_config = FileWriterConfig {
731 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
732 file_format: Some("json".to_string()),
733 ..Default::default()
734 };
735
736 let log_guard = Logger::init_with_config(
737 TraderId::from("TRADER-001"),
738 UUID4::new(),
739 config,
740 file_config,
741 );
742
743 logging_clock_set_static_mode();
744 logging_clock_set_static_time(1_650_000_000_000_000);
745
746 log::info!(
747 component = "RiskEngine";
748 "This is a test."
749 );
750
751 let mut log_contents = String::new();
752
753 drop(log_guard); wait_until(
756 || {
757 if let Some(log_file) = std::fs::read_dir(&temp_dir)
758 .expect("Failed to read directory")
759 .filter_map(Result::ok)
760 .find(|entry| entry.path().is_file())
761 {
762 let log_file_path = log_file.path();
763 log_contents = std::fs::read_to_string(log_file_path)
764 .expect("Error while reading log file");
765 !log_contents.is_empty()
766 } else {
767 false
768 }
769 },
770 Duration::from_secs(2),
771 );
772
773 assert_eq!(
774 log_contents,
775 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
776 );
777 }
778}