1use std::{
17 collections::HashMap,
18 env,
19 fmt::Display,
20 str::FromStr,
21 sync::{atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27 kv::{ToValue, Value},
28 set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31 UUID4, UnixNanos,
32 datetime::unix_nanos_to_iso8601,
33 time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_REALTIME};
40use crate::{
41 enums::{LogColor, LogLevel},
42 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46
47#[cfg_attr(
48 feature = "python",
49 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
50)]
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct LoggerConfig {
53 pub stdout_level: LevelFilter,
55 pub fileout_level: LevelFilter,
57 component_level: HashMap<Ustr, LevelFilter>,
59 pub is_colored: bool,
61 pub print_config: bool,
63}
64
65impl Default for LoggerConfig {
66 fn default() -> Self {
68 Self {
69 stdout_level: LevelFilter::Info,
70 fileout_level: LevelFilter::Off,
71 component_level: HashMap::new(),
72 is_colored: false,
73 print_config: false,
74 }
75 }
76}
77
78impl LoggerConfig {
79 #[must_use]
81 pub const fn new(
82 stdout_level: LevelFilter,
83 fileout_level: LevelFilter,
84 component_level: HashMap<Ustr, LevelFilter>,
85 is_colored: bool,
86 print_config: bool,
87 ) -> Self {
88 Self {
89 stdout_level,
90 fileout_level,
91 component_level,
92 is_colored,
93 print_config,
94 }
95 }
96
97 pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
98 let mut config = Self::default();
99 for kv in spec.split(';') {
100 let kv = kv.trim();
101 if kv.is_empty() {
102 continue;
103 }
104 let kv_lower = kv.to_lowercase(); if kv_lower == "is_colored" {
106 config.is_colored = true;
107 } else if kv_lower == "print_config" {
108 config.print_config = true;
109 } else {
110 let parts: Vec<&str> = kv.split('=').collect();
111 if parts.len() != 2 {
112 anyhow::bail!("Invalid spec pair: {}", kv);
113 }
114 let k = parts[0].trim(); let v = parts[1].trim(); let lvl = LevelFilter::from_str(v)
117 .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
118 let k_lower = k.to_lowercase(); match k_lower.as_str() {
120 "stdout" => config.stdout_level = lvl,
121 "fileout" => config.fileout_level = lvl,
122 _ => {
123 config.component_level.insert(Ustr::from(k), lvl);
124 }
125 }
126 }
127 }
128 Ok(config)
129 }
130
131 pub fn from_env() -> anyhow::Result<Self> {
137 let spec = env::var("NAUTILUS_LOG")?;
138 Self::from_spec(&spec)
139 }
140}
141
142#[derive(Debug)]
148pub struct Logger {
149 pub config: LoggerConfig,
151 tx: std::sync::mpsc::Sender<LogEvent>,
153}
154
155pub enum LogEvent {
157 Log(LogLine),
159 Flush,
161}
162
163#[derive(Clone, Debug, Serialize, Deserialize)]
165pub struct LogLine {
166 pub timestamp: UnixNanos,
168 pub level: Level,
170 pub color: LogColor,
172 pub component: Ustr,
174 pub message: String,
176}
177
178impl Display for LogLine {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
181 }
182}
183
184pub struct LogLineWrapper {
191 line: LogLine,
193 cache: Option<String>,
195 colored: Option<String>,
197 trader_id: Ustr,
199}
200
201impl LogLineWrapper {
202 #[must_use]
204 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
205 Self {
206 line,
207 cache: None,
208 colored: None,
209 trader_id,
210 }
211 }
212
213 pub fn get_string(&mut self) -> &str {
218 self.cache.get_or_insert_with(|| {
219 format!(
220 "{} [{}] {}.{}: {}\n",
221 unix_nanos_to_iso8601(self.line.timestamp),
222 self.line.level,
223 self.trader_id,
224 &self.line.component,
225 &self.line.message,
226 )
227 })
228 }
229
230 pub fn get_colored(&mut self) -> &str {
236 self.colored.get_or_insert_with(|| {
237 format!(
238 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
239 unix_nanos_to_iso8601(self.line.timestamp),
240 &self.line.color.as_ansi(),
241 self.line.level,
242 self.trader_id,
243 &self.line.component,
244 &self.line.message,
245 )
246 })
247 }
248
249 #[must_use]
255 pub fn get_json(&self) -> String {
256 let json_string =
257 serde_json::to_string(&self).expect("Error serializing log event to string");
258 format!("{json_string}\n")
259 }
260}
261
262impl Serialize for LogLineWrapper {
263 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
264 where
265 S: Serializer,
266 {
267 let mut json_obj = IndexMap::new();
268 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
269 json_obj.insert("timestamp".to_string(), timestamp);
270 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
271 json_obj.insert("level".to_string(), self.line.level.to_string());
272 json_obj.insert("color".to_string(), self.line.color.to_string());
273 json_obj.insert("component".to_string(), self.line.component.to_string());
274 json_obj.insert("message".to_string(), self.line.message.to_string());
275
276 json_obj.serialize(serializer)
277 }
278}
279
280impl Log for Logger {
281 fn enabled(&self, metadata: &log::Metadata) -> bool {
282 !LOGGING_BYPASSED.load(Ordering::Relaxed)
283 && (metadata.level() == Level::Error
284 || metadata.level() <= self.config.stdout_level
285 || metadata.level() <= self.config.fileout_level)
286 }
287
288 fn log(&self, record: &log::Record) {
289 if self.enabled(record.metadata()) {
290 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
291 get_atomic_clock_realtime().get_time_ns()
292 } else {
293 get_atomic_clock_static().get_time_ns()
294 };
295 let level = record.level();
296 let key_values = record.key_values();
297 let color: LogColor = key_values
298 .get("color".into())
299 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
300 .unwrap_or(level.into());
301 let component = key_values.get("component".into()).map_or_else(
302 || Ustr::from(record.metadata().target()),
303 |v| Ustr::from(&v.to_string()),
304 );
305
306 let line = LogLine {
307 timestamp,
308 level,
309 color,
310 component,
311 message: format!("{}", record.args()),
312 };
313 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
314 eprintln!("Error sending log event (receiver closed): {line}");
315 }
316 }
317 }
318
319 fn flush(&self) {
320 if let Err(e) = self.tx.send(LogEvent::Flush) {
321 eprintln!("Error sending flush log event (receiver closed): {e}");
322 }
323 }
324}
325
326#[allow(clippy::too_many_arguments)]
327impl Logger {
328 pub fn init_with_env(
329 trader_id: TraderId,
330 instance_id: UUID4,
331 file_config: FileWriterConfig,
332 ) -> anyhow::Result<LogGuard> {
333 let config = LoggerConfig::from_env()?;
334 Self::init_with_config(trader_id, instance_id, config, file_config)
335 }
336
337 pub fn init_with_config(
347 trader_id: TraderId,
348 instance_id: UUID4,
349 config: LoggerConfig,
350 file_config: FileWriterConfig,
351 ) -> anyhow::Result<LogGuard> {
352 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
353
354 let logger = Self {
355 tx,
356 config: config.clone(),
357 };
358
359 let print_config = config.print_config;
360 if print_config {
361 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
362 println!("Logger initialized with {config:?} {file_config:?}");
363 }
364
365 let handle: Option<std::thread::JoinHandle<()>>;
366 match set_boxed_logger(Box::new(logger)) {
367 Ok(()) => {
368 handle = Some(
369 std::thread::Builder::new()
370 .name(LOGGING.to_string())
371 .spawn(move || {
372 Self::handle_messages(
373 trader_id.to_string(),
374 instance_id.to_string(),
375 config,
376 file_config,
377 rx,
378 );
379 })?,
380 );
381
382 let max_level = log::LevelFilter::Trace;
383 set_max_level(max_level);
384 if print_config {
385 println!("Logger set as `log` implementation with max level {max_level}");
386 }
387 }
388 Err(e) => {
389 anyhow::bail!("Cannot initialize logger because of error: {e}");
390 }
391 }
392
393 Ok(LogGuard::new(handle))
394 }
395
396 fn handle_messages(
397 trader_id: String,
398 instance_id: String,
399 config: LoggerConfig,
400 file_config: FileWriterConfig,
401 rx: std::sync::mpsc::Receiver<LogEvent>,
402 ) {
403 let LoggerConfig {
404 stdout_level,
405 fileout_level,
406 ref component_level,
407 is_colored,
408 print_config: _,
409 } = config;
410
411 let trader_id_cache = Ustr::from(&trader_id);
412
413 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
415 let mut stderr_writer = StderrWriter::new(is_colored);
416
417 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
419 None
420 } else {
421 FileWriter::new(trader_id, instance_id, file_config, fileout_level)
422 };
423
424 while let Ok(event) = rx.recv() {
426 match event {
427 LogEvent::Flush => {
428 break;
429 }
430 LogEvent::Log(line) => {
431 let component_level = component_level.get(&line.component);
432
433 if let Some(&filter_level) = component_level {
436 if line.level > filter_level {
437 continue;
438 }
439 }
440
441 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
442
443 if stderr_writer.enabled(&wrapper.line) {
444 if is_colored {
445 stderr_writer.write(wrapper.get_colored());
446 } else {
447 stderr_writer.write(wrapper.get_string());
448 }
449 }
450
451 if stdout_writer.enabled(&wrapper.line) {
452 if is_colored {
453 stdout_writer.write(wrapper.get_colored());
454 } else {
455 stdout_writer.write(wrapper.get_string());
456 }
457 }
458
459 if let Some(ref mut writer) = file_writer_opt {
460 if writer.enabled(&wrapper.line) {
461 if writer.json_format {
462 writer.write(&wrapper.get_json());
463 } else {
464 writer.write(wrapper.get_string());
465 }
466 }
467 }
468 }
469 }
470 }
471 }
472}
473
474pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
475 let color = Value::from(color as u8);
476
477 match level {
478 LogLevel::Off => {}
479 LogLevel::Trace => {
480 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
481 }
482 LogLevel::Debug => {
483 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
484 }
485 LogLevel::Info => {
486 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
487 }
488 LogLevel::Warning => {
489 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
490 }
491 LogLevel::Error => {
492 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
493 }
494 }
495}
496
497#[cfg_attr(
498 feature = "python",
499 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
500)]
501#[derive(Debug)]
502pub struct LogGuard {
503 handle: Option<std::thread::JoinHandle<()>>,
504}
505
506impl LogGuard {
507 #[must_use]
509 pub const fn new(handle: Option<std::thread::JoinHandle<()>>) -> Self {
510 Self { handle }
511 }
512}
513
514impl Default for LogGuard {
515 fn default() -> Self {
517 Self::new(None)
518 }
519}
520
521impl Drop for LogGuard {
522 fn drop(&mut self) {
523 log::logger().flush();
524 if let Some(handle) = self.handle.take() {
525 handle.join().expect("Error joining logging handle");
526 }
527 }
528}
529
530#[cfg(test)]
534mod tests {
535 use std::{collections::HashMap, time::Duration};
536
537 use log::LevelFilter;
538 use nautilus_core::UUID4;
539 use nautilus_model::identifiers::TraderId;
540 use rstest::*;
541 use serde_json::Value;
542 use tempfile::tempdir;
543 use ustr::Ustr;
544
545 use super::*;
546 use crate::{
547 enums::LogColor,
548 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
549 testing::wait_until,
550 };
551
552 #[rstest]
553 fn log_message_serialization() {
554 let log_message = LogLine {
555 timestamp: UnixNanos::default(),
556 level: log::Level::Info,
557 color: LogColor::Normal,
558 component: Ustr::from("Portfolio"),
559 message: "This is a log message".to_string(),
560 };
561
562 let serialized_json = serde_json::to_string(&log_message).unwrap();
563 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
564
565 assert_eq!(deserialized_value["level"], "INFO");
566 assert_eq!(deserialized_value["component"], "Portfolio");
567 assert_eq!(deserialized_value["message"], "This is a log message");
568 }
569
570 #[rstest]
571 fn log_config_parsing() {
572 let config =
573 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
574 .unwrap();
575 assert_eq!(
576 config,
577 LoggerConfig {
578 stdout_level: LevelFilter::Info,
579 fileout_level: LevelFilter::Debug,
580 component_level: HashMap::from_iter(vec![(
581 Ustr::from("RiskEngine"),
582 LevelFilter::Error
583 )]),
584 is_colored: true,
585 print_config: false,
586 }
587 );
588 }
589
590 #[rstest]
591 fn log_config_parsing2() {
592 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
593 assert_eq!(
594 config,
595 LoggerConfig {
596 stdout_level: LevelFilter::Warn,
597 fileout_level: LevelFilter::Error,
598 component_level: HashMap::new(),
599 is_colored: false,
600 print_config: true,
601 }
602 );
603 }
604
605 #[rstest]
606 fn test_logging_to_file() {
607 let config = LoggerConfig {
608 fileout_level: LevelFilter::Debug,
609 ..Default::default()
610 };
611
612 let temp_dir = tempdir().expect("Failed to create temporary directory");
613 let file_config = FileWriterConfig {
614 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
615 ..Default::default()
616 };
617
618 let log_guard = Logger::init_with_config(
619 TraderId::from("TRADER-001"),
620 UUID4::new(),
621 config,
622 file_config,
623 );
624
625 logging_clock_set_static_mode();
626 logging_clock_set_static_time(1_650_000_000_000_000);
627
628 log::info!(
629 component = "RiskEngine";
630 "This is a test."
631 );
632
633 let mut log_contents = String::new();
634
635 wait_until(
636 || {
637 std::fs::read_dir(&temp_dir)
638 .expect("Failed to read directory")
639 .filter_map(Result::ok)
640 .any(|entry| entry.path().is_file())
641 },
642 Duration::from_secs(2),
643 );
644
645 drop(log_guard); wait_until(
648 || {
649 let log_file_path = std::fs::read_dir(&temp_dir)
650 .expect("Failed to read directory")
651 .filter_map(Result::ok)
652 .find(|entry| entry.path().is_file())
653 .expect("No files found in directory")
654 .path();
655 dbg!(&log_file_path);
656 log_contents =
657 std::fs::read_to_string(log_file_path).expect("Error while reading log file");
658 !log_contents.is_empty()
659 },
660 Duration::from_secs(2),
661 );
662
663 assert_eq!(
664 log_contents,
665 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
666 );
667 }
668
669 #[rstest]
670 fn test_log_component_level_filtering() {
671 let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
672
673 let temp_dir = tempdir().expect("Failed to create temporary directory");
674 let file_config = FileWriterConfig {
675 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
676 ..Default::default()
677 };
678
679 let log_guard = Logger::init_with_config(
680 TraderId::from("TRADER-001"),
681 UUID4::new(),
682 config,
683 file_config,
684 );
685
686 logging_clock_set_static_mode();
687 logging_clock_set_static_time(1_650_000_000_000_000);
688
689 log::info!(
690 component = "RiskEngine";
691 "This is a test."
692 );
693
694 drop(log_guard); wait_until(
697 || {
698 if let Some(log_file) = std::fs::read_dir(&temp_dir)
699 .expect("Failed to read directory")
700 .filter_map(Result::ok)
701 .find(|entry| entry.path().is_file())
702 {
703 let log_file_path = log_file.path();
704 let log_contents = std::fs::read_to_string(log_file_path)
705 .expect("Error while reading log file");
706 !log_contents.contains("RiskEngine")
707 } else {
708 false
709 }
710 },
711 Duration::from_secs(3),
712 );
713
714 assert!(
715 std::fs::read_dir(&temp_dir)
716 .expect("Failed to read directory")
717 .filter_map(Result::ok)
718 .any(|entry| entry.path().is_file()),
719 "Log file exists"
720 );
721 }
722
723 #[rstest]
724 fn test_logging_to_file_in_json_format() {
725 let config =
726 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
727 .unwrap();
728
729 let temp_dir = tempdir().expect("Failed to create temporary directory");
730 let file_config = FileWriterConfig {
731 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
732 file_format: Some("json".to_string()),
733 ..Default::default()
734 };
735
736 let log_guard = Logger::init_with_config(
737 TraderId::from("TRADER-001"),
738 UUID4::new(),
739 config,
740 file_config,
741 );
742
743 logging_clock_set_static_mode();
744 logging_clock_set_static_time(1_650_000_000_000_000);
745
746 log::info!(
747 component = "RiskEngine";
748 "This is a test."
749 );
750
751 let mut log_contents = String::new();
752
753 drop(log_guard); wait_until(
756 || {
757 if let Some(log_file) = std::fs::read_dir(&temp_dir)
758 .expect("Failed to read directory")
759 .filter_map(Result::ok)
760 .find(|entry| entry.path().is_file())
761 {
762 let log_file_path = log_file.path();
763 log_contents = std::fs::read_to_string(log_file_path)
764 .expect("Error while reading log file");
765 !log_contents.is_empty()
766 } else {
767 false
768 }
769 },
770 Duration::from_secs(2),
771 );
772
773 assert_eq!(
774 log_contents,
775 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
776 );
777 }
778}