1use std::{
17 cell::{Ref, RefCell},
18 rc::Rc,
19 time::Duration,
20};
21
22use nautilus_common::{
23 cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
24 clock::{Clock, TestClock},
25 component::Component,
26 enums::Environment,
27 logging::{
28 headers, init_logging,
29 logger::{LogGuard, LoggerConfig},
30 writer::FileWriterConfig,
31 },
32 msgbus::{MessageBus, set_message_bus},
33};
34use nautilus_core::{UUID4, UnixNanos};
35use nautilus_data::engine::DataEngine;
36use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
37use nautilus_model::identifiers::{ClientId, TraderId};
38use nautilus_portfolio::portfolio::Portfolio;
39use nautilus_risk::engine::RiskEngine;
40use ustr::Ustr;
41
42use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
43
44#[derive(Debug)]
48pub struct NautilusKernel {
49 pub name: String,
51 pub instance_id: UUID4,
53 pub machine_id: String,
55 pub config: Box<dyn NautilusKernelConfig>,
57 pub cache: Rc<RefCell<Cache>>,
59 pub clock: Rc<RefCell<dyn Clock>>,
61 pub portfolio: Rc<RefCell<Portfolio>>,
63 pub log_guard: LogGuard,
65 pub data_engine: Rc<RefCell<DataEngine>>,
67 pub risk_engine: Rc<RefCell<RiskEngine>>,
69 pub exec_engine: Rc<RefCell<ExecutionEngine>>,
71 pub order_emulator: OrderEmulatorAdapter,
73 pub trader: Trader,
75 pub ts_created: UnixNanos,
77 pub ts_started: Option<UnixNanos>,
79 pub ts_shutdown: Option<UnixNanos>,
81}
82
83impl NautilusKernel {
84 #[must_use]
86 pub const fn builder(
87 name: String,
88 trader_id: TraderId,
89 environment: Environment,
90 ) -> NautilusKernelBuilder {
91 NautilusKernelBuilder::new(name, trader_id, environment)
92 }
93
94 pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
100 let instance_id = config.instance_id().unwrap_or_default();
101 let machine_id = Self::determine_machine_id()?;
102
103 let logger_config = config.logging();
104 let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
105 headers::log_header(
106 config.trader_id(),
107 &machine_id,
108 instance_id,
109 Ustr::from(stringify!(LiveNode)),
110 );
111
112 log::info!("Building system kernel");
113
114 let clock = Self::initialize_clock(&config.environment());
115 let cache = Self::initialize_cache(config.cache());
116
117 let msgbus = Rc::new(RefCell::new(MessageBus::new(
118 config.trader_id(),
119 instance_id,
120 Some(name.clone()),
121 None,
122 )));
123 set_message_bus(msgbus);
124
125 let portfolio = Rc::new(RefCell::new(Portfolio::new(
126 cache.clone(),
127 clock.clone(),
128 config.portfolio(),
129 )));
130
131 let risk_engine = RiskEngine::new(
132 config.risk_engine().unwrap_or_default(),
133 portfolio.borrow().clone_shallow(),
134 clock.clone(),
135 cache.clone(),
136 );
137 let risk_engine = Rc::new(RefCell::new(risk_engine));
138
139 let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
140 let exec_engine = Rc::new(RefCell::new(exec_engine));
141
142 let order_emulator =
143 OrderEmulatorAdapter::new(config.trader_id(), clock.clone(), cache.clone());
144
145 let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
146 let data_engine = Rc::new(RefCell::new(data_engine));
147
148 DataEngine::register_msgbus_handlers(data_engine.clone());
149 RiskEngine::register_msgbus_handlers(risk_engine.clone());
150 ExecutionEngine::register_msgbus_handlers(exec_engine.clone());
151
152 let trader = Trader::new(
153 config.trader_id(),
154 instance_id,
155 config.environment(),
156 clock.clone(),
157 cache.clone(),
158 portfolio.clone(),
159 );
160
161 let ts_created = clock.borrow().timestamp_ns();
162
163 Ok(Self {
164 name,
165 instance_id,
166 machine_id,
167 config: Box::new(config),
168 cache,
169 clock,
170 portfolio,
171 log_guard,
172 data_engine,
173 risk_engine,
174 exec_engine,
175 order_emulator,
176 trader,
177 ts_created,
178 ts_started: None,
179 ts_shutdown: None,
180 })
181 }
182
183 fn determine_machine_id() -> anyhow::Result<String> {
184 sysinfo::System::host_name().ok_or_else(|| anyhow::anyhow!("Failed to determine hostname"))
185 }
186
187 fn initialize_logging(
188 trader_id: TraderId,
189 instance_id: UUID4,
190 config: LoggerConfig,
191 ) -> anyhow::Result<LogGuard> {
192 #[cfg(feature = "tracing-bridge")]
193 let use_tracing = config.use_tracing;
194
195 let log_guard = match init_logging(
196 trader_id,
197 instance_id,
198 config,
199 FileWriterConfig::default(), ) {
201 Ok(guard) => guard,
202 Err(e) => {
203 if e.downcast_ref::<log::SetLoggerError>().is_some() {
208 if let Some(guard) = LogGuard::new() {
209 guard
210 } else {
211 return Err(e.context(
212 "A non-Nautilus logger is already registered; \
213 cannot initialize Nautilus logging",
214 ));
215 }
216 } else {
217 return Err(e);
218 }
219 }
220 };
221
222 #[cfg(feature = "tracing-bridge")]
224 if use_tracing && !nautilus_common::logging::bridge::tracing_is_initialized() {
225 nautilus_common::logging::bridge::init_tracing()?;
226 }
227
228 Ok(log_guard)
229 }
230
231 fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
232 match environment {
233 Environment::Backtest => {
234 let test_clock = TestClock::new();
235 Rc::new(RefCell::new(test_clock))
236 }
237 #[cfg(feature = "live")]
238 Environment::Live | Environment::Sandbox => {
239 let live_clock = nautilus_common::live::clock::LiveClock::default(); Rc::new(RefCell::new(live_clock))
241 }
242 #[cfg(not(feature = "live"))]
243 Environment::Live | Environment::Sandbox => {
244 panic!(
245 "Live/Sandbox environment requires the 'live' feature to be enabled. \
246 Build with `--features live` or add `features = [\"live\"]` to your dependency."
247 );
248 }
249 }
250 }
251
252 fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
253 let cache_config = cache_config.unwrap_or_default();
254
255 let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
257 let cache = Cache::new(Some(cache_config), cache_database);
258
259 Rc::new(RefCell::new(cache))
260 }
261
262 fn cancel_timers(&self) {
263 self.clock.borrow_mut().cancel_timers();
264 }
265
266 #[must_use]
267 pub fn generate_timestamp_ns(&self) -> UnixNanos {
268 self.clock.borrow().timestamp_ns()
269 }
270
271 #[must_use]
273 pub fn environment(&self) -> Environment {
274 self.config.environment()
275 }
276
277 #[must_use]
279 pub const fn name(&self) -> &str {
280 self.name.as_str()
281 }
282
283 #[must_use]
285 pub fn trader_id(&self) -> TraderId {
286 self.config.trader_id()
287 }
288
289 #[must_use]
291 pub fn machine_id(&self) -> &str {
292 &self.machine_id
293 }
294
295 #[must_use]
297 pub const fn instance_id(&self) -> UUID4 {
298 self.instance_id
299 }
300
301 #[must_use]
303 pub fn delay_post_stop(&self) -> Duration {
304 self.config.delay_post_stop()
305 }
306
307 #[must_use]
309 pub const fn ts_created(&self) -> UnixNanos {
310 self.ts_created
311 }
312
313 #[must_use]
315 pub const fn ts_started(&self) -> Option<UnixNanos> {
316 self.ts_started
317 }
318
319 #[must_use]
321 pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
322 self.ts_shutdown
323 }
324
325 #[must_use]
327 pub fn load_state(&self) -> bool {
328 self.config.load_state()
329 }
330
331 #[must_use]
333 pub fn save_state(&self) -> bool {
334 self.config.save_state()
335 }
336
337 #[must_use]
339 pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
340 self.clock.clone()
341 }
342
343 #[must_use]
345 pub fn cache(&self) -> Rc<RefCell<Cache>> {
346 self.cache.clone()
347 }
348
349 #[must_use]
351 pub fn portfolio(&self) -> Ref<'_, Portfolio> {
352 self.portfolio.borrow()
353 }
354
355 #[must_use]
357 pub fn data_engine(&self) -> Ref<'_, DataEngine> {
358 self.data_engine.borrow()
359 }
360
361 #[must_use]
363 pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
364 &self.risk_engine
365 }
366
367 #[must_use]
369 pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
370 &self.exec_engine
371 }
372
373 #[must_use]
375 pub const fn trader(&self) -> &Trader {
376 &self.trader
377 }
378
379 pub fn start(&mut self) {
381 log::info!("Starting");
382 self.start_engines();
383
384 log::info!("Initializing trader");
385 if let Err(e) = self.trader.initialize() {
386 log::error!("Error initializing trader: {e:?}");
387 return;
388 }
389
390 log::info!("Starting clients...");
391 if let Err(e) = self.start_clients() {
392 log::error!("Error starting clients: {e:?}");
393 }
394 log::info!("Clients started");
395
396 self.ts_started = Some(self.clock.borrow().timestamp_ns());
397 log::info!("Started");
398 }
399
400 pub async fn start_async(&mut self) {
402 self.start();
403 }
404
405 pub fn start_trader(&mut self) {
409 log::info!("Starting trader...");
410 if let Err(e) = self.trader.start() {
411 log::error!("Error starting trader: {e:?}");
412 }
413 log::info!("Trader started");
414 }
415
416 pub fn stop_trader(&mut self) {
422 if !self.trader.is_running() {
423 return;
424 }
425
426 log::info!("Stopping trader...");
427
428 if let Err(e) = self.trader.stop() {
429 log::error!("Error stopping trader: {e}");
430 }
431 }
432
433 pub async fn finalize_stop(&mut self) {
438 if let Err(e) = self.stop_all_clients() {
440 log::error!("Error stopping clients: {e:?}");
441 }
442
443 self.stop_engines();
444 self.cancel_timers();
445
446 self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
447 log::info!("Stopped");
448 }
449
450 pub fn reset(&mut self) {
452 log::info!("Resetting");
453
454 if let Err(e) = self.trader.reset() {
455 log::error!("Error resetting trader: {e:?}");
456 }
457
458 self.data_engine.borrow_mut().reset();
459 self.exec_engine.borrow_mut().reset();
460 self.risk_engine.borrow_mut().reset();
461
462 self.ts_started = None;
463 self.ts_shutdown = None;
464
465 log::info!("Reset");
466 }
467
468 pub fn dispose(&mut self) {
470 log::info!("Disposing");
471
472 if let Err(e) = self.trader.dispose() {
473 log::error!("Error disposing trader: {e:?}");
474 }
475
476 self.stop_engines();
477
478 self.data_engine.borrow_mut().dispose();
479 self.exec_engine.borrow_mut().dispose();
480 self.risk_engine.borrow_mut().dispose();
481
482 log::info!("Disposed");
483 }
484
485 fn start_engines(&self) {
487 self.data_engine.borrow_mut().start();
488 self.exec_engine.borrow_mut().start();
489 self.risk_engine.borrow_mut().start();
490 }
491
492 fn stop_engines(&self) {
494 self.data_engine.borrow_mut().stop();
495 self.exec_engine.borrow_mut().stop();
496 self.risk_engine.borrow_mut().stop();
497 }
498
499 fn start_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
504 let mut errors = Vec::new();
505
506 {
507 let mut exec_engine = self.exec_engine.borrow_mut();
508 let exec_adapters = exec_engine.get_clients_mut();
509
510 for adapter in exec_adapters {
511 if let Err(e) = adapter.start() {
512 log::error!("Error starting execution client {}: {e}", adapter.client_id);
513 errors.push(e);
514 }
515 }
516 }
517
518 if errors.is_empty() {
519 Ok(())
520 } else {
521 Err(errors)
522 }
523 }
524
525 fn stop_all_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
530 let mut errors = Vec::new();
531
532 {
533 let mut exec_engine = self.exec_engine.borrow_mut();
534 let exec_adapters = exec_engine.get_clients_mut();
535
536 for adapter in exec_adapters {
537 if let Err(e) = adapter.stop() {
538 log::error!("Error stopping execution client {}: {e}", adapter.client_id);
539 errors.push(e);
540 }
541 }
542 }
543
544 if errors.is_empty() {
545 Ok(())
546 } else {
547 Err(errors)
548 }
549 }
550
551 #[allow(clippy::await_holding_refcell_ref)] pub async fn connect_clients(&mut self) {
556 log::info!("Connecting clients...");
557 self.data_engine.borrow_mut().connect().await;
558 self.exec_engine.borrow_mut().connect().await;
559 }
560
561 #[allow(clippy::await_holding_refcell_ref)] pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
568 log::info!("Disconnecting clients...");
569 self.data_engine.borrow_mut().disconnect().await?;
570 self.exec_engine.borrow_mut().disconnect().await?;
571 Ok(())
572 }
573
574 #[must_use]
576 pub fn check_engines_connected(&self) -> bool {
577 self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
578 }
579
580 #[must_use]
582 pub fn check_engines_disconnected(&self) -> bool {
583 self.data_engine.borrow().check_disconnected()
584 && self.exec_engine.borrow().check_disconnected()
585 }
586
587 #[must_use]
589 pub fn data_client_connection_status(&self) -> Vec<(ClientId, bool)> {
590 self.data_engine.borrow().client_connection_status()
591 }
592
593 #[must_use]
595 pub fn exec_client_connection_status(&self) -> Vec<(ClientId, bool)> {
596 self.exec_engine.borrow().client_connection_status()
597 }
598}