1use std::{
17 collections::HashMap,
18 env,
19 fmt::Display,
20 str::FromStr,
21 sync::{atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27 kv::{ToValue, Value},
28 set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31 UUID4, UnixNanos,
32 datetime::unix_nanos_to_iso8601,
33 time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_REALTIME};
40use crate::{
41 enums::{LogColor, LogLevel},
42 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46
47#[cfg_attr(
48 feature = "python",
49 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
50)]
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct LoggerConfig {
53 pub stdout_level: LevelFilter,
55 pub fileout_level: LevelFilter,
57 component_level: HashMap<Ustr, LevelFilter>,
59 pub is_colored: bool,
61 pub print_config: bool,
63}
64
65impl Default for LoggerConfig {
66 fn default() -> Self {
68 Self {
69 stdout_level: LevelFilter::Info,
70 fileout_level: LevelFilter::Off,
71 component_level: HashMap::new(),
72 is_colored: false,
73 print_config: false,
74 }
75 }
76}
77
78impl LoggerConfig {
79 #[must_use]
81 pub const fn new(
82 stdout_level: LevelFilter,
83 fileout_level: LevelFilter,
84 component_level: HashMap<Ustr, LevelFilter>,
85 is_colored: bool,
86 print_config: bool,
87 ) -> Self {
88 Self {
89 stdout_level,
90 fileout_level,
91 component_level,
92 is_colored,
93 print_config,
94 }
95 }
96
97 pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
98 let mut config = Self::default();
99 for kv in spec.split(';') {
100 let kv = kv.trim();
101 if kv.is_empty() {
102 continue;
103 }
104 let kv_lower = kv.to_lowercase(); if kv_lower == "is_colored" {
106 config.is_colored = true;
107 } else if kv_lower == "print_config" {
108 config.print_config = true;
109 } else {
110 let parts: Vec<&str> = kv.split('=').collect();
111 if parts.len() != 2 {
112 anyhow::bail!("Invalid spec pair: {}", kv);
113 }
114 let k = parts[0].trim(); let v = parts[1].trim(); let lvl = LevelFilter::from_str(v)
117 .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
118 let k_lower = k.to_lowercase(); match k_lower.as_str() {
120 "stdout" => config.stdout_level = lvl,
121 "fileout" => config.fileout_level = lvl,
122 _ => {
123 config.component_level.insert(Ustr::from(k), lvl);
124 }
125 };
126 }
127 }
128 Ok(config)
129 }
130
131 pub fn from_env() -> anyhow::Result<Self> {
137 let spec = env::var("NAUTILUS_LOG")?;
138 Self::from_spec(&spec)
139 }
140}
141
142#[derive(Debug)]
148pub struct Logger {
149 pub config: LoggerConfig,
151 tx: std::sync::mpsc::Sender<LogEvent>,
153}
154
155pub enum LogEvent {
157 Log(LogLine),
159 Flush,
161}
162
163#[derive(Clone, Debug, Serialize, Deserialize)]
165pub struct LogLine {
166 pub timestamp: UnixNanos,
168 pub level: Level,
170 pub color: LogColor,
172 pub component: Ustr,
174 pub message: String,
176}
177
178impl Display for LogLine {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
181 }
182}
183
184pub struct LogLineWrapper {
191 line: LogLine,
193 cache: Option<String>,
195 colored: Option<String>,
197 trader_id: Ustr,
199}
200
201impl LogLineWrapper {
202 #[must_use]
204 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
205 Self {
206 line,
207 cache: None,
208 colored: None,
209 trader_id,
210 }
211 }
212
213 pub fn get_string(&mut self) -> &str {
218 self.cache.get_or_insert_with(|| {
219 format!(
220 "{} [{}] {}.{}: {}\n",
221 unix_nanos_to_iso8601(self.line.timestamp),
222 self.line.level,
223 self.trader_id,
224 &self.line.component,
225 &self.line.message,
226 )
227 })
228 }
229
230 pub fn get_colored(&mut self) -> &str {
236 self.colored.get_or_insert_with(|| {
237 format!(
238 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
239 unix_nanos_to_iso8601(self.line.timestamp),
240 &self.line.color.as_ansi(),
241 self.line.level,
242 self.trader_id,
243 &self.line.component,
244 &self.line.message,
245 )
246 })
247 }
248
249 #[must_use]
255 pub fn get_json(&self) -> String {
256 let json_string =
257 serde_json::to_string(&self).expect("Error serializing log event to string");
258 format!("{json_string}\n")
259 }
260}
261
262impl Serialize for LogLineWrapper {
263 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
264 where
265 S: Serializer,
266 {
267 let mut json_obj = IndexMap::new();
268 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
269 json_obj.insert("timestamp".to_string(), timestamp);
270 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
271 json_obj.insert("level".to_string(), self.line.level.to_string());
272 json_obj.insert("color".to_string(), self.line.color.to_string());
273 json_obj.insert("component".to_string(), self.line.component.to_string());
274 json_obj.insert("message".to_string(), self.line.message.to_string());
275
276 json_obj.serialize(serializer)
277 }
278}
279
280impl Log for Logger {
281 fn enabled(&self, metadata: &log::Metadata) -> bool {
282 !LOGGING_BYPASSED.load(Ordering::Relaxed)
283 && (metadata.level() == Level::Error
284 || metadata.level() <= self.config.stdout_level
285 || metadata.level() <= self.config.fileout_level)
286 }
287
288 fn log(&self, record: &log::Record) {
289 if self.enabled(record.metadata()) {
290 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
291 get_atomic_clock_realtime().get_time_ns()
292 } else {
293 get_atomic_clock_static().get_time_ns()
294 };
295 let key_values = record.key_values();
296 let color = key_values
297 .get("color".into())
298 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
299 .unwrap_or(LogColor::Normal);
300 let component = key_values.get("component".into()).map_or_else(
301 || Ustr::from(record.metadata().target()),
302 |v| Ustr::from(&v.to_string()),
303 );
304
305 let line = LogLine {
306 timestamp,
307 level: record.level(),
308 color,
309 component,
310 message: format!("{}", record.args()),
311 };
312 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
313 eprintln!("Error sending log event (receiver closed): {line}");
314 }
315 }
316 }
317
318 fn flush(&self) {
319 if let Err(e) = self.tx.send(LogEvent::Flush) {
320 eprintln!("Error sending flush log event (receiver closed): {e}");
321 }
322 }
323}
324
325#[allow(clippy::too_many_arguments)]
326impl Logger {
327 pub fn init_with_env(
328 trader_id: TraderId,
329 instance_id: UUID4,
330 file_config: FileWriterConfig,
331 ) -> anyhow::Result<LogGuard> {
332 let config = LoggerConfig::from_env()?;
333 Self::init_with_config(trader_id, instance_id, config, file_config)
334 }
335
336 pub fn init_with_config(
346 trader_id: TraderId,
347 instance_id: UUID4,
348 config: LoggerConfig,
349 file_config: FileWriterConfig,
350 ) -> anyhow::Result<LogGuard> {
351 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
352
353 let logger = Self {
354 tx,
355 config: config.clone(),
356 };
357
358 let print_config = config.print_config;
359 if print_config {
360 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
361 println!("Logger initialized with {config:?} {file_config:?}");
362 }
363
364 let handle: Option<std::thread::JoinHandle<()>>;
365 match set_boxed_logger(Box::new(logger)) {
366 Ok(()) => {
367 handle = Some(
368 std::thread::Builder::new()
369 .name(LOGGING.to_string())
370 .spawn(move || {
371 Self::handle_messages(
372 trader_id.to_string(),
373 instance_id.to_string(),
374 config,
375 file_config,
376 rx,
377 );
378 })?,
379 );
380
381 let max_level = log::LevelFilter::Trace;
382 set_max_level(max_level);
383 if print_config {
384 println!("Logger set as `log` implementation with max level {max_level}");
385 }
386 }
387 Err(e) => {
388 anyhow::bail!("Cannot initialize logger because of error: {e}");
389 }
390 }
391
392 Ok(LogGuard::new(handle))
393 }
394
395 fn handle_messages(
396 trader_id: String,
397 instance_id: String,
398 config: LoggerConfig,
399 file_config: FileWriterConfig,
400 rx: std::sync::mpsc::Receiver<LogEvent>,
401 ) {
402 let LoggerConfig {
403 stdout_level,
404 fileout_level,
405 ref component_level,
406 is_colored,
407 print_config: _,
408 } = config;
409
410 let trader_id_cache = Ustr::from(&trader_id);
411
412 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
414 let mut stderr_writer = StderrWriter::new(is_colored);
415
416 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
418 None
419 } else {
420 FileWriter::new(trader_id, instance_id, file_config, fileout_level)
421 };
422
423 while let Ok(event) = rx.recv() {
425 match event {
426 LogEvent::Flush => {
427 break;
428 }
429 LogEvent::Log(line) => {
430 let component_level = component_level.get(&line.component);
431
432 if let Some(&filter_level) = component_level {
435 if line.level > filter_level {
436 continue;
437 }
438 }
439
440 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
441
442 if stderr_writer.enabled(&wrapper.line) {
443 if is_colored {
444 stderr_writer.write(wrapper.get_colored());
445 } else {
446 stderr_writer.write(wrapper.get_string());
447 }
448 }
449
450 if stdout_writer.enabled(&wrapper.line) {
451 if is_colored {
452 stdout_writer.write(wrapper.get_colored());
453 } else {
454 stdout_writer.write(wrapper.get_string());
455 }
456 }
457
458 if let Some(ref mut writer) = file_writer_opt {
459 if writer.enabled(&wrapper.line) {
460 if writer.json_format {
461 writer.write(&wrapper.get_json());
462 } else {
463 writer.write(wrapper.get_string());
464 }
465 }
466 }
467 }
468 }
469 }
470 }
471}
472
473pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
474 let color = Value::from(color as u8);
475
476 match level {
477 LogLevel::Off => {}
478 LogLevel::Trace => {
479 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
480 }
481 LogLevel::Debug => {
482 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
483 }
484 LogLevel::Info => {
485 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
486 }
487 LogLevel::Warning => {
488 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
489 }
490 LogLevel::Error => {
491 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
492 }
493 }
494}
495
496#[cfg_attr(
497 feature = "python",
498 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
499)]
500#[derive(Debug)]
501pub struct LogGuard {
502 handle: Option<std::thread::JoinHandle<()>>,
503}
504
505impl LogGuard {
506 #[must_use]
508 pub const fn new(handle: Option<std::thread::JoinHandle<()>>) -> Self {
509 Self { handle }
510 }
511}
512
513impl Default for LogGuard {
514 fn default() -> Self {
516 Self::new(None)
517 }
518}
519
520impl Drop for LogGuard {
521 fn drop(&mut self) {
522 log::logger().flush();
523 if let Some(handle) = self.handle.take() {
524 handle.join().expect("Error joining logging handle");
525 }
526 }
527}
528
529#[cfg(test)]
533mod tests {
534 use std::{collections::HashMap, time::Duration};
535
536 use log::LevelFilter;
537 use nautilus_core::UUID4;
538 use nautilus_model::identifiers::TraderId;
539 use rstest::*;
540 use serde_json::Value;
541 use tempfile::tempdir;
542 use ustr::Ustr;
543
544 use super::*;
545 use crate::{
546 enums::LogColor,
547 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
548 testing::wait_until,
549 };
550
551 #[rstest]
552 fn log_message_serialization() {
553 let log_message = LogLine {
554 timestamp: UnixNanos::default(),
555 level: log::Level::Info,
556 color: LogColor::Normal,
557 component: Ustr::from("Portfolio"),
558 message: "This is a log message".to_string(),
559 };
560
561 let serialized_json = serde_json::to_string(&log_message).unwrap();
562 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
563
564 assert_eq!(deserialized_value["level"], "INFO");
565 assert_eq!(deserialized_value["component"], "Portfolio");
566 assert_eq!(deserialized_value["message"], "This is a log message");
567 }
568
569 #[rstest]
570 fn log_config_parsing() {
571 let config =
572 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
573 .unwrap();
574 assert_eq!(
575 config,
576 LoggerConfig {
577 stdout_level: LevelFilter::Info,
578 fileout_level: LevelFilter::Debug,
579 component_level: HashMap::from_iter(vec![(
580 Ustr::from("RiskEngine"),
581 LevelFilter::Error
582 )]),
583 is_colored: true,
584 print_config: false,
585 }
586 );
587 }
588
589 #[rstest]
590 fn log_config_parsing2() {
591 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
592 assert_eq!(
593 config,
594 LoggerConfig {
595 stdout_level: LevelFilter::Warn,
596 fileout_level: LevelFilter::Error,
597 component_level: HashMap::new(),
598 is_colored: false,
599 print_config: true,
600 }
601 );
602 }
603
604 #[rstest]
605 fn test_logging_to_file() {
606 let config = LoggerConfig {
607 fileout_level: LevelFilter::Debug,
608 ..Default::default()
609 };
610
611 let temp_dir = tempdir().expect("Failed to create temporary directory");
612 let file_config = FileWriterConfig {
613 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
614 ..Default::default()
615 };
616
617 let log_guard = Logger::init_with_config(
618 TraderId::from("TRADER-001"),
619 UUID4::new(),
620 config,
621 file_config,
622 );
623
624 logging_clock_set_static_mode();
625 logging_clock_set_static_time(1_650_000_000_000_000);
626
627 log::info!(
628 component = "RiskEngine";
629 "This is a test."
630 );
631
632 let mut log_contents = String::new();
633
634 wait_until(
635 || {
636 std::fs::read_dir(&temp_dir)
637 .expect("Failed to read directory")
638 .filter_map(Result::ok)
639 .any(|entry| entry.path().is_file())
640 },
641 Duration::from_secs(2),
642 );
643
644 drop(log_guard); wait_until(
647 || {
648 let log_file_path = std::fs::read_dir(&temp_dir)
649 .expect("Failed to read directory")
650 .filter_map(Result::ok)
651 .find(|entry| entry.path().is_file())
652 .expect("No files found in directory")
653 .path();
654 dbg!(&log_file_path);
655 log_contents =
656 std::fs::read_to_string(log_file_path).expect("Error while reading log file");
657 !log_contents.is_empty()
658 },
659 Duration::from_secs(2),
660 );
661
662 assert_eq!(
663 log_contents,
664 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
665 );
666 }
667
668 #[rstest]
669 fn test_log_component_level_filtering() {
670 let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
671
672 let temp_dir = tempdir().expect("Failed to create temporary directory");
673 let file_config = FileWriterConfig {
674 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
675 ..Default::default()
676 };
677
678 let log_guard = Logger::init_with_config(
679 TraderId::from("TRADER-001"),
680 UUID4::new(),
681 config,
682 file_config,
683 );
684
685 logging_clock_set_static_mode();
686 logging_clock_set_static_time(1_650_000_000_000_000);
687
688 log::info!(
689 component = "RiskEngine";
690 "This is a test."
691 );
692
693 drop(log_guard); wait_until(
696 || {
697 if let Some(log_file) = std::fs::read_dir(&temp_dir)
698 .expect("Failed to read directory")
699 .filter_map(Result::ok)
700 .find(|entry| entry.path().is_file())
701 {
702 let log_file_path = log_file.path();
703 let log_contents = std::fs::read_to_string(log_file_path)
704 .expect("Error while reading log file");
705 !log_contents.contains("RiskEngine")
706 } else {
707 false
708 }
709 },
710 Duration::from_secs(3),
711 );
712
713 assert!(
714 std::fs::read_dir(&temp_dir)
715 .expect("Failed to read directory")
716 .filter_map(Result::ok)
717 .any(|entry| entry.path().is_file()),
718 "Log file exists"
719 );
720 }
721
722 #[rstest]
723 fn test_logging_to_file_in_json_format() {
724 let config =
725 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
726 .unwrap();
727
728 let temp_dir = tempdir().expect("Failed to create temporary directory");
729 let file_config = FileWriterConfig {
730 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
731 file_format: Some("json".to_string()),
732 ..Default::default()
733 };
734
735 let log_guard = Logger::init_with_config(
736 TraderId::from("TRADER-001"),
737 UUID4::new(),
738 config,
739 file_config,
740 );
741
742 logging_clock_set_static_mode();
743 logging_clock_set_static_time(1_650_000_000_000_000);
744
745 log::info!(
746 component = "RiskEngine";
747 "This is a test."
748 );
749
750 let mut log_contents = String::new();
751
752 drop(log_guard); wait_until(
755 || {
756 if let Some(log_file) = std::fs::read_dir(&temp_dir)
757 .expect("Failed to read directory")
758 .filter_map(Result::ok)
759 .find(|entry| entry.path().is_file())
760 {
761 let log_file_path = log_file.path();
762 log_contents = std::fs::read_to_string(log_file_path)
763 .expect("Error while reading log file");
764 !log_contents.is_empty()
765 } else {
766 false
767 }
768 },
769 Duration::from_secs(2),
770 );
771
772 assert_eq!(
773 log_contents,
774 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
775 );
776 }
777}