1use std::{
17 any::Any,
18 cell::{Ref, RefCell},
19 rc::Rc,
20 time::Duration,
21};
22
23use nautilus_common::{
24 cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
25 clock::{Clock, TestClock},
26 component::Component,
27 enums::Environment,
28 logging::{
29 headers, init_logging,
30 logger::{LogGuard, LoggerConfig},
31 writer::FileWriterConfig,
32 },
33 messages::{DataResponse, ExecutionReport, data::DataCommand, execution::TradingCommand},
34 msgbus::{
35 self, MessageBus,
36 handler::{ShareableMessageHandler, TypedMessageHandler},
37 set_message_bus,
38 switchboard::MessagingSwitchboard,
39 },
40 runner::get_data_cmd_sender,
41};
42use nautilus_core::{UUID4, UnixNanos, WeakCell};
43use nautilus_data::engine::DataEngine;
44use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
45use nautilus_model::{events::OrderEventAny, identifiers::TraderId};
46use nautilus_portfolio::portfolio::Portfolio;
47use nautilus_risk::engine::RiskEngine;
48use ustr::Ustr;
49
50use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
51
52#[derive(Debug)]
56pub struct NautilusKernel {
57 pub name: String,
59 pub instance_id: UUID4,
61 pub machine_id: String,
63 pub config: Box<dyn NautilusKernelConfig>,
65 pub cache: Rc<RefCell<Cache>>,
67 pub clock: Rc<RefCell<dyn Clock>>,
69 pub portfolio: Rc<RefCell<Portfolio>>,
71 pub log_guard: LogGuard,
73 pub data_engine: Rc<RefCell<DataEngine>>,
75 pub risk_engine: Rc<RefCell<RiskEngine>>,
77 pub exec_engine: Rc<RefCell<ExecutionEngine>>,
79 pub order_emulator: OrderEmulatorAdapter,
81 pub trader: Trader,
83 pub ts_created: UnixNanos,
85 pub ts_started: Option<UnixNanos>,
87 pub ts_shutdown: Option<UnixNanos>,
89}
90
91impl NautilusKernel {
92 #[must_use]
94 pub const fn builder(
95 name: String,
96 trader_id: TraderId,
97 environment: Environment,
98 ) -> NautilusKernelBuilder {
99 NautilusKernelBuilder::new(name, trader_id, environment)
100 }
101
102 pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
108 let instance_id = config.instance_id().unwrap_or_default();
109 let machine_id = Self::determine_machine_id()?;
110
111 let logger_config = config.logging();
112 let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
113 headers::log_header(
114 config.trader_id(),
115 &machine_id,
116 instance_id,
117 Ustr::from(stringify!(LiveNode)),
118 );
119
120 log::info!("Building system kernel");
121
122 let clock = Self::initialize_clock(&config.environment());
123 let cache = Self::initialize_cache(config.cache());
124
125 let msgbus = Rc::new(RefCell::new(MessageBus::new(
126 config.trader_id(),
127 instance_id,
128 Some(name.clone()),
129 None,
130 )));
131 set_message_bus(msgbus);
132
133 let portfolio = Rc::new(RefCell::new(Portfolio::new(
134 cache.clone(),
135 clock.clone(),
136 config.portfolio(),
137 )));
138
139 let risk_engine = RiskEngine::new(
140 config.risk_engine().unwrap_or_default(),
141 portfolio.borrow().clone_shallow(),
142 clock.clone(),
143 cache.clone(),
144 );
145 let risk_engine = Rc::new(RefCell::new(risk_engine));
146
147 let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
148 let exec_engine = Rc::new(RefCell::new(exec_engine));
149
150 let order_emulator = OrderEmulatorAdapter::new(clock.clone(), cache.clone());
152
153 let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
154 let data_engine = Rc::new(RefCell::new(data_engine));
155
156 let data_engine_weak = WeakCell::from(Rc::downgrade(&data_engine));
159 let data_engine_weak_clone1 = data_engine_weak.clone();
160 let endpoint = MessagingSwitchboard::data_engine_execute();
161 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
162 move |cmd: &DataCommand| {
163 if let Some(engine_rc) = data_engine_weak_clone1.upgrade() {
164 engine_rc.borrow_mut().execute(cmd);
165 }
166 },
167 )));
168 msgbus::register(endpoint, handler);
169
170 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
172 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
173 move |cmd: &DataCommand| {
174 get_data_cmd_sender().clone().execute(cmd.clone());
175 },
176 )));
177 msgbus::register(endpoint, handler);
178
179 let endpoint = MessagingSwitchboard::data_engine_process();
181 let data_engine_weak2 = data_engine_weak.clone();
182 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
183 move |data: &dyn Any| {
184 if let Some(engine_rc) = data_engine_weak2.upgrade() {
185 engine_rc.borrow_mut().process(data);
186 }
187 },
188 )));
189 msgbus::register(endpoint, handler);
190
191 let endpoint = MessagingSwitchboard::data_engine_response();
193 let data_engine_weak3 = data_engine_weak;
194 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
195 move |resp: &DataResponse| {
196 if let Some(engine_rc) = data_engine_weak3.upgrade() {
197 engine_rc.borrow_mut().response(resp.clone());
198 }
199 },
200 )));
201 msgbus::register(endpoint, handler);
202
203 let risk_engine_weak = WeakCell::from(Rc::downgrade(&risk_engine));
205 let endpoint = MessagingSwitchboard::risk_engine_execute();
206 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
207 move |cmd: &TradingCommand| {
208 if let Some(engine_rc) = risk_engine_weak.upgrade() {
209 engine_rc.borrow_mut().execute(cmd.clone());
210 }
211 },
212 )));
213 msgbus::register(endpoint, handler);
214
215 let exec_engine_weak1 = WeakCell::from(Rc::downgrade(&exec_engine));
217 let endpoint = MessagingSwitchboard::exec_engine_execute();
218 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
219 move |cmd: &TradingCommand| {
220 if let Some(engine_rc) = exec_engine_weak1.upgrade() {
221 engine_rc.borrow().execute(cmd);
222 }
223 },
224 )));
225 msgbus::register(endpoint, handler);
226
227 let exec_engine_weak2 = WeakCell::from(Rc::downgrade(&exec_engine));
229 let endpoint = MessagingSwitchboard::exec_engine_process();
230 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
231 move |event: &OrderEventAny| {
232 if let Some(engine_rc) = exec_engine_weak2.upgrade() {
233 engine_rc.borrow_mut().process(event);
234 }
235 },
236 )));
237 msgbus::register(endpoint, handler);
238
239 let exec_engine_weak3 = WeakCell::from(Rc::downgrade(&exec_engine));
241 let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
242 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
243 move |report: &ExecutionReport| {
244 if let Some(engine_rc) = exec_engine_weak3.upgrade() {
245 engine_rc.borrow_mut().reconcile_execution_report(report);
246 }
247 },
248 )));
249 msgbus::register(endpoint, handler);
250
251 let trader = Trader::new(
252 config.trader_id(),
253 instance_id,
254 config.environment(),
255 clock.clone(),
256 cache.clone(),
257 portfolio.clone(),
258 );
259
260 let ts_created = clock.borrow().timestamp_ns();
261
262 Ok(Self {
263 name,
264 instance_id,
265 machine_id,
266 config: Box::new(config),
267 cache,
268 clock,
269 portfolio,
270 log_guard,
271 data_engine,
272 risk_engine,
273 exec_engine,
274 order_emulator,
275 trader,
276 ts_created,
277 ts_started: None,
278 ts_shutdown: None,
279 })
280 }
281
282 fn determine_machine_id() -> anyhow::Result<String> {
283 sysinfo::System::host_name().ok_or_else(|| anyhow::anyhow!("Failed to determine hostname"))
284 }
285
286 fn initialize_logging(
287 trader_id: TraderId,
288 instance_id: UUID4,
289 config: LoggerConfig,
290 ) -> anyhow::Result<LogGuard> {
291 let log_guard = init_logging(
292 trader_id,
293 instance_id,
294 config,
295 FileWriterConfig::default(), )?;
297
298 Ok(log_guard)
299 }
300
301 fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
302 match environment {
303 Environment::Backtest => {
304 let test_clock = TestClock::new();
305 Rc::new(RefCell::new(test_clock))
306 }
307 #[cfg(feature = "live")]
308 Environment::Live | Environment::Sandbox => {
309 let live_clock = nautilus_common::live::clock::LiveClock::default(); Rc::new(RefCell::new(live_clock))
311 }
312 #[cfg(not(feature = "live"))]
313 Environment::Live | Environment::Sandbox => {
314 panic!(
315 "Live/Sandbox environment requires the 'live' feature to be enabled. \
316 Build with `--features live` or add `features = [\"live\"]` to your dependency."
317 );
318 }
319 }
320 }
321
322 fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
323 let cache_config = cache_config.unwrap_or_default();
324
325 let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
327 let cache = Cache::new(Some(cache_config), cache_database);
328
329 Rc::new(RefCell::new(cache))
330 }
331
332 fn cancel_timers(&self) {
333 self.clock.borrow_mut().cancel_timers();
334 }
335
336 #[must_use]
337 pub fn generate_timestamp_ns(&self) -> UnixNanos {
338 self.clock.borrow().timestamp_ns()
339 }
340
341 #[must_use]
343 pub fn environment(&self) -> Environment {
344 self.config.environment()
345 }
346
347 #[must_use]
349 pub const fn name(&self) -> &str {
350 self.name.as_str()
351 }
352
353 #[must_use]
355 pub fn trader_id(&self) -> TraderId {
356 self.config.trader_id()
357 }
358
359 #[must_use]
361 pub fn machine_id(&self) -> &str {
362 &self.machine_id
363 }
364
365 #[must_use]
367 pub const fn instance_id(&self) -> UUID4 {
368 self.instance_id
369 }
370
371 #[must_use]
373 pub fn delay_post_stop(&self) -> Duration {
374 self.config.delay_post_stop()
375 }
376
377 #[must_use]
379 pub const fn ts_created(&self) -> UnixNanos {
380 self.ts_created
381 }
382
383 #[must_use]
385 pub const fn ts_started(&self) -> Option<UnixNanos> {
386 self.ts_started
387 }
388
389 #[must_use]
391 pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
392 self.ts_shutdown
393 }
394
395 #[must_use]
397 pub fn load_state(&self) -> bool {
398 self.config.load_state()
399 }
400
401 #[must_use]
403 pub fn save_state(&self) -> bool {
404 self.config.save_state()
405 }
406
407 #[must_use]
409 pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
410 self.clock.clone()
411 }
412
413 #[must_use]
415 pub fn cache(&self) -> Rc<RefCell<Cache>> {
416 self.cache.clone()
417 }
418
419 #[must_use]
421 pub fn portfolio(&self) -> Ref<'_, Portfolio> {
422 self.portfolio.borrow()
423 }
424
425 #[must_use]
427 pub fn data_engine(&self) -> Ref<'_, DataEngine> {
428 self.data_engine.borrow()
429 }
430
431 #[must_use]
433 pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
434 &self.risk_engine
435 }
436
437 #[must_use]
439 pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
440 &self.exec_engine
441 }
442
443 #[must_use]
445 pub const fn trader(&self) -> &Trader {
446 &self.trader
447 }
448
449 pub async fn start_async(&mut self) {
451 log::info!("Starting");
452 self.start_engines();
453
454 log::info!("Initializing trader");
455 if let Err(e) = self.trader.initialize() {
456 log::error!("Error initializing trader: {e:?}");
457 return;
458 }
459
460 log::info!("Starting clients...");
461 if let Err(e) = self.start_clients() {
462 log::error!("Error starting clients: {e:?}");
463 }
464 log::info!("Clients started");
465
466 self.ts_started = Some(self.clock.borrow().timestamp_ns());
467 log::info!("Started");
468 }
469
470 pub fn start_trader(&mut self) {
474 log::info!("Starting trader...");
475 if let Err(e) = self.trader.start() {
476 log::error!("Error starting trader: {e:?}");
477 }
478 log::info!("Trader started");
479 }
480
481 pub fn stop_trader(&mut self) {
487 log::info!("Stopping");
488
489 if let Err(e) = self.trader.stop() {
491 log::error!("Error stopping trader: {e:?}");
492 }
493 }
494
495 pub async fn finalize_stop(&mut self) {
500 if let Err(e) = self.stop_all_clients() {
502 log::error!("Error stopping clients: {e:?}");
503 }
504
505 self.stop_engines();
506 self.cancel_timers();
507
508 self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
509 log::info!("Stopped");
510 }
511
512 pub fn reset(&mut self) {
514 log::info!("Resetting");
515
516 if let Err(e) = self.trader.reset() {
517 log::error!("Error resetting trader: {e:?}");
518 }
519
520 self.data_engine.borrow_mut().reset();
521 self.exec_engine.borrow_mut().reset();
522 self.risk_engine.borrow_mut().reset();
523
524 self.ts_started = None;
525 self.ts_shutdown = None;
526
527 log::info!("Reset");
528 }
529
530 pub fn dispose(&mut self) {
532 log::info!("Disposing");
533
534 if let Err(e) = self.trader.dispose() {
535 log::error!("Error disposing trader: {e:?}");
536 }
537
538 self.stop_engines();
539
540 self.data_engine.borrow_mut().dispose();
541 self.exec_engine.borrow_mut().dispose();
542 self.risk_engine.borrow_mut().dispose();
543
544 log::info!("Disposed");
545 }
546
547 fn start_engines(&self) {
549 self.data_engine.borrow_mut().start();
550 self.exec_engine.borrow_mut().start();
551 self.risk_engine.borrow_mut().start();
552 }
553
554 fn stop_engines(&self) {
556 self.data_engine.borrow_mut().stop();
557 self.exec_engine.borrow_mut().stop();
558 self.risk_engine.borrow_mut().stop();
559 }
560
561 fn start_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
566 let mut errors = Vec::new();
567
568 {
569 let mut exec_engine = self.exec_engine.borrow_mut();
570 let exec_adapters = exec_engine.get_clients_mut();
571
572 for adapter in exec_adapters {
573 if let Err(e) = adapter.start() {
574 log::error!("Error starting execution client {}: {e}", adapter.client_id);
575 errors.push(e);
576 }
577 }
578 }
579
580 if errors.is_empty() {
581 Ok(())
582 } else {
583 Err(errors)
584 }
585 }
586
587 fn stop_all_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
592 let mut errors = Vec::new();
593
594 {
595 let mut exec_engine = self.exec_engine.borrow_mut();
596 let exec_adapters = exec_engine.get_clients_mut();
597
598 for adapter in exec_adapters {
599 if let Err(e) = adapter.stop() {
600 log::error!("Error stopping execution client {}: {e}", adapter.client_id);
601 errors.push(e);
602 }
603 }
604 }
605
606 if errors.is_empty() {
607 Ok(())
608 } else {
609 Err(errors)
610 }
611 }
612
613 #[allow(clippy::await_holding_refcell_ref)] pub async fn connect_clients(&mut self) -> anyhow::Result<()> {
620 log::info!("Connecting clients...");
621 self.data_engine.borrow_mut().connect().await?;
622 self.exec_engine.borrow_mut().connect().await?;
623 Ok(())
624 }
625
626 #[allow(clippy::await_holding_refcell_ref)] pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
633 log::info!("Disconnecting clients...");
634 self.data_engine.borrow_mut().disconnect().await?;
635 self.exec_engine.borrow_mut().disconnect().await?;
636 Ok(())
637 }
638
639 #[must_use]
641 pub fn check_engines_connected(&self) -> bool {
642 self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
643 }
644
645 #[must_use]
647 pub fn check_engines_disconnected(&self) -> bool {
648 self.data_engine.borrow().check_disconnected()
649 && self.exec_engine.borrow().check_disconnected()
650 }
651}