1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::{Ref, RefCell},
23 rc::Rc,
24 time::Duration,
25};
26
27#[cfg(feature = "live")]
28use nautilus_common::live::clock::LiveClock;
29use nautilus_common::{
30 cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
31 clock::{Clock, TestClock},
32 component::Component,
33 enums::Environment,
34 logging::{
35 headers, init_logging, init_tracing,
36 logger::{LogGuard, LoggerConfig},
37 writer::FileWriterConfig,
38 },
39 messages::{DataResponse, data::DataCommand, execution::TradingCommand},
40 msgbus::{
41 self, MessageBus, get_message_bus,
42 handler::{ShareableMessageHandler, TypedMessageHandler},
43 set_message_bus,
44 switchboard::MessagingSwitchboard,
45 },
46 runner::get_data_cmd_sender,
47};
48use nautilus_core::{UUID4, UnixNanos, WeakCell};
49use nautilus_data::engine::DataEngine;
50use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
51use nautilus_model::{
52 enums::OrderStatus,
53 events::{OrderCanceled, OrderEventAny, OrderExpired},
54 identifiers::TraderId,
55 orders::Order,
56 reports::OrderStatusReport,
57};
58use nautilus_portfolio::portfolio::Portfolio;
59use nautilus_risk::engine::RiskEngine;
60use ustr::Ustr;
61
62use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
63
64#[derive(Debug)]
68pub struct NautilusKernel {
69 pub name: String,
71 pub instance_id: UUID4,
73 pub machine_id: String,
75 pub config: Box<dyn NautilusKernelConfig>,
77 pub cache: Rc<RefCell<Cache>>,
79 pub clock: Rc<RefCell<dyn Clock>>,
81 pub portfolio: Rc<RefCell<Portfolio>>,
83 pub log_guard: LogGuard,
85 pub data_engine: Rc<RefCell<DataEngine>>,
87 pub risk_engine: Rc<RefCell<RiskEngine>>,
89 pub exec_engine: Rc<RefCell<ExecutionEngine>>,
91 pub order_emulator: OrderEmulatorAdapter,
93 pub trader: Trader,
95 pub ts_created: UnixNanos,
97 pub ts_started: Option<UnixNanos>,
99 pub ts_shutdown: Option<UnixNanos>,
101}
102
103impl NautilusKernel {
104 #[must_use]
106 pub const fn builder(
107 name: String,
108 trader_id: TraderId,
109 environment: Environment,
110 ) -> NautilusKernelBuilder {
111 NautilusKernelBuilder::new(name, trader_id, environment)
112 }
113
114 pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
120 let instance_id = config.instance_id().unwrap_or_default();
121 let machine_id = Self::determine_machine_id()?;
122
123 let logger_config = config.logging();
124 let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
125 headers::log_header(
126 config.trader_id(),
127 &machine_id,
128 instance_id,
129 Ustr::from(stringify!(LiveNode)),
130 );
131
132 log::info!("Building system kernel");
133
134 let clock = Self::initialize_clock(&config.environment());
135 let cache = Self::initialize_cache(config.cache());
136
137 let msgbus = Rc::new(RefCell::new(MessageBus::new(
138 config.trader_id(),
139 instance_id,
140 Some(name.clone()),
141 None,
142 )));
143 set_message_bus(msgbus);
144
145 let portfolio = Rc::new(RefCell::new(Portfolio::new(
146 cache.clone(),
147 clock.clone(),
148 config.portfolio(),
149 )));
150
151 let risk_engine = RiskEngine::new(
152 config.risk_engine().unwrap_or_default(),
153 portfolio.borrow().clone_shallow(),
154 clock.clone(),
155 cache.clone(),
156 );
157 let risk_engine = Rc::new(RefCell::new(risk_engine));
158
159 let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
160 let exec_engine = Rc::new(RefCell::new(exec_engine));
161
162 let order_emulator = OrderEmulatorAdapter::new(clock.clone(), cache.clone());
164
165 let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
166 let data_engine = Rc::new(RefCell::new(data_engine));
167
168 let data_engine_weak = WeakCell::from(Rc::downgrade(&data_engine));
171 let data_engine_weak_clone1 = data_engine_weak.clone();
172 let endpoint = MessagingSwitchboard::data_engine_execute();
173 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
174 move |cmd: &DataCommand| {
175 if let Some(engine_rc) = data_engine_weak_clone1.upgrade() {
176 engine_rc.borrow_mut().execute(cmd);
177 }
178 },
179 )));
180 msgbus::register(endpoint, handler);
181
182 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
184 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
185 move |cmd: &DataCommand| {
186 get_data_cmd_sender().clone().execute(cmd.clone());
187 },
188 )));
189 msgbus::register(endpoint, handler);
190
191 let endpoint = MessagingSwitchboard::data_engine_process();
193 let data_engine_weak2 = data_engine_weak.clone();
194 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
195 move |data: &dyn Any| {
196 if let Some(engine_rc) = data_engine_weak2.upgrade() {
197 engine_rc.borrow_mut().process(data);
198 }
199 },
200 )));
201 msgbus::register(endpoint, handler);
202
203 let endpoint = MessagingSwitchboard::data_engine_response();
205 let data_engine_weak3 = data_engine_weak;
206 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
207 move |resp: &DataResponse| {
208 if let Some(engine_rc) = data_engine_weak3.upgrade() {
209 engine_rc.borrow_mut().response(resp.clone());
210 }
211 },
212 )));
213 msgbus::register(endpoint, handler);
214
215 let risk_engine_weak = WeakCell::from(Rc::downgrade(&risk_engine));
217 let endpoint = MessagingSwitchboard::risk_engine_execute();
218 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
219 move |cmd: &TradingCommand| {
220 if let Some(engine_rc) = risk_engine_weak.upgrade() {
221 engine_rc.borrow_mut().execute(cmd.clone());
222 }
223 },
224 )));
225 msgbus::register(endpoint, handler);
226
227 let exec_engine_weak = WeakCell::from(Rc::downgrade(&exec_engine));
229 let exec_engine_weak_clone = exec_engine_weak.clone();
230 let endpoint = MessagingSwitchboard::exec_engine_execute();
231 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
232 move |cmd: &TradingCommand| {
233 if let Some(engine_rc) = exec_engine_weak.upgrade() {
234 engine_rc.borrow().execute(cmd);
235 }
236 },
237 )));
238 msgbus::register(endpoint, handler);
239
240 let endpoint = MessagingSwitchboard::exec_engine_process();
242 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
243 move |event: &OrderEventAny| {
244 if let Some(engine_rc) = exec_engine_weak_clone.upgrade() {
245 engine_rc.borrow_mut().process(event);
246 } else {
247 log::error!(
248 "ExecEngine dropped, cannot process order event: {:?}",
249 event.client_order_id()
250 );
251 }
252 },
253 )));
254 msgbus::register(endpoint, handler);
255
256 let cache_weak = WeakCell::from(Rc::downgrade(&cache));
257 let exec_engine_weak2 = WeakCell::from(Rc::downgrade(&exec_engine));
258 let trader_id = config.trader_id();
259
260 let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
261 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
262 move |report: &OrderStatusReport| {
263 let Some(cache_rc) = cache_weak.upgrade() else {
264 log::error!("Cache dropped, cannot reconcile order status report");
265 return;
266 };
267 let Some(exec_engine_rc) = exec_engine_weak2.upgrade() else {
268 log::error!("ExecEngine dropped, cannot reconcile order status report");
269 return;
270 };
271
272 let cache = cache_rc.borrow();
273
274 let order = report
275 .client_order_id
276 .and_then(|id| cache.order(&id).cloned())
277 .or_else(|| {
278 cache
279 .client_order_id(&report.venue_order_id)
280 .and_then(|cid| cache.order(cid).cloned())
281 });
282
283 let Some(order) = order else {
284 log::debug!(
285 "Order not found in cache for reconciliation: client_order_id={:?}, venue_order_id={}",
286 report.client_order_id,
287 report.venue_order_id
288 );
289 return;
290 };
291
292 if order.status() == report.order_status {
293 return;
294 }
295
296 if !order.is_open() {
297 return;
298 }
299
300 drop(cache); let event: Option<OrderEventAny> = match report.order_status {
303 OrderStatus::Canceled => {
304 log::debug!(
305 "Reconciling canceled order: client_order_id={}, venue_order_id={}",
306 order.client_order_id(),
307 report.venue_order_id
308 );
309 Some(OrderEventAny::Canceled(OrderCanceled::new(
310 trader_id,
311 order.strategy_id(),
312 order.instrument_id(),
313 order.client_order_id(),
314 UUID4::new(),
315 report.ts_last,
316 report.ts_init,
317 true, Some(report.venue_order_id),
319 Some(report.account_id),
320 )))
321 }
322 OrderStatus::Expired => {
323 log::debug!(
324 "Reconciling expired order: client_order_id={}, venue_order_id={}",
325 order.client_order_id(),
326 report.venue_order_id
327 );
328 Some(OrderEventAny::Expired(OrderExpired::new(
329 trader_id,
330 order.strategy_id(),
331 order.instrument_id(),
332 order.client_order_id(),
333 UUID4::new(),
334 report.ts_last,
335 report.ts_init,
336 true, Some(report.venue_order_id),
338 Some(report.account_id),
339 )))
340 }
341 _ => None,
342 };
343
344 if let Some(evt) = event {
345 exec_engine_rc.borrow_mut().process(&evt);
346 }
347 },
348 )));
349 msgbus::register(endpoint, handler);
350
351 let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_mass_status();
352 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
353 move |report: &dyn Any| {
354 log::debug!(
355 "Received execution mass status for reconciliation: {:?}",
356 report.type_id()
357 );
358 },
359 )));
360 msgbus::register(endpoint, handler);
361
362 let trader = Trader::new(
363 config.trader_id(),
364 instance_id,
365 config.environment(),
366 clock.clone(),
367 cache.clone(),
368 portfolio.clone(),
369 );
370
371 let ts_created = clock.borrow().timestamp_ns();
372
373 Ok(Self {
374 name,
375 instance_id,
376 machine_id,
377 config: Box::new(config),
378 cache,
379 clock,
380 portfolio,
381 log_guard,
382 data_engine,
383 risk_engine,
384 exec_engine,
385 order_emulator,
386 trader,
387 ts_created,
388 ts_started: None,
389 ts_shutdown: None,
390 })
391 }
392
393 fn determine_machine_id() -> anyhow::Result<String> {
394 sysinfo::System::host_name().ok_or_else(|| anyhow::anyhow!("Failed to determine hostname"))
395 }
396
397 fn initialize_logging(
398 trader_id: TraderId,
399 instance_id: UUID4,
400 config: LoggerConfig,
401 ) -> anyhow::Result<LogGuard> {
402 let log_guard = init_logging(
403 trader_id,
404 instance_id,
405 config,
406 FileWriterConfig::default(), )?;
408
409 init_tracing()?;
410
411 Ok(log_guard)
412 }
413
414 fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
415 match environment {
416 Environment::Backtest => {
417 let test_clock = TestClock::new();
418 Rc::new(RefCell::new(test_clock))
419 }
420 #[cfg(feature = "live")]
421 Environment::Live | Environment::Sandbox => {
422 let live_clock = LiveClock::default();
423 Rc::new(RefCell::new(live_clock))
424 }
425 #[cfg(not(feature = "live"))]
426 Environment::Live | Environment::Sandbox => {
427 panic!(
428 "Live/Sandbox environment requires the 'live' feature to be enabled. \
429 Build with `--features live` or add `features = [\"live\"]` to your dependency."
430 );
431 }
432 }
433 }
434
435 fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
436 let cache_config = cache_config.unwrap_or_default();
437
438 let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
440 let cache = Cache::new(Some(cache_config), cache_database);
441
442 Rc::new(RefCell::new(cache))
443 }
444
445 fn cancel_timers(&self) {
446 self.clock.borrow_mut().cancel_timers();
447 }
448
449 #[must_use]
450 pub fn generate_timestamp_ns(&self) -> UnixNanos {
451 self.clock.borrow().timestamp_ns()
452 }
453
454 #[must_use]
456 pub fn environment(&self) -> Environment {
457 self.config.environment()
458 }
459
460 #[must_use]
462 pub const fn name(&self) -> &str {
463 self.name.as_str()
464 }
465
466 #[must_use]
468 pub fn trader_id(&self) -> TraderId {
469 self.config.trader_id()
470 }
471
472 #[must_use]
474 pub fn machine_id(&self) -> &str {
475 &self.machine_id
476 }
477
478 #[must_use]
480 pub const fn instance_id(&self) -> UUID4 {
481 self.instance_id
482 }
483
484 #[must_use]
486 pub fn delay_post_stop(&self) -> Duration {
487 self.config.delay_post_stop()
488 }
489
490 #[must_use]
492 pub const fn ts_created(&self) -> UnixNanos {
493 self.ts_created
494 }
495
496 #[must_use]
498 pub const fn ts_started(&self) -> Option<UnixNanos> {
499 self.ts_started
500 }
501
502 #[must_use]
504 pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
505 self.ts_shutdown
506 }
507
508 #[must_use]
510 pub fn load_state(&self) -> bool {
511 self.config.load_state()
512 }
513
514 #[must_use]
516 pub fn save_state(&self) -> bool {
517 self.config.save_state()
518 }
519
520 #[must_use]
522 pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
523 self.clock.clone()
524 }
525
526 #[must_use]
528 pub fn cache(&self) -> Rc<RefCell<Cache>> {
529 self.cache.clone()
530 }
531
532 #[must_use]
534 pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
535 get_message_bus()
536 }
537
538 #[must_use]
540 pub fn portfolio(&self) -> Ref<'_, Portfolio> {
541 self.portfolio.borrow()
542 }
543
544 #[must_use]
546 pub fn data_engine(&self) -> Ref<'_, DataEngine> {
547 self.data_engine.borrow()
548 }
549
550 #[must_use]
552 pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
553 &self.risk_engine
554 }
555
556 #[must_use]
558 pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
559 &self.exec_engine
560 }
561
562 #[must_use]
564 pub const fn trader(&self) -> &Trader {
565 &self.trader
566 }
567
568 pub async fn start_async(&mut self) {
570 log::info!("Starting");
571 self.start_engines();
572
573 log::info!("Initializing trader");
574 if let Err(e) = self.trader.initialize() {
575 log::error!("Error initializing trader: {e:?}");
576 return;
577 }
578
579 log::info!("Starting clients...");
580 if let Err(e) = self.start_clients() {
581 log::error!("Error starting clients: {e:?}");
582 }
583 log::info!("Clients started");
584
585 if let Err(e) = self.trader.start() {
586 log::error!("Error starting trader: {e:?}");
587 }
588
589 self.ts_started = Some(self.clock.borrow().timestamp_ns());
590 log::info!("Started");
591 }
592
593 pub fn stop_trader(&mut self) {
599 log::info!("Stopping");
600
601 if let Err(e) = self.trader.stop() {
603 log::error!("Error stopping trader: {e:?}");
604 }
605 }
606
607 pub async fn finalize_stop(&mut self) {
612 if let Err(e) = self.stop_all_clients() {
614 log::error!("Error stopping clients: {e:?}");
615 }
616
617 self.stop_engines();
618 self.cancel_timers();
619
620 self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
621 log::info!("Stopped");
622 }
623
624 pub fn reset(&mut self) {
626 log::info!("Resetting");
627
628 if let Err(e) = self.trader.reset() {
629 log::error!("Error resetting trader: {e:?}");
630 }
631
632 self.data_engine.borrow_mut().reset();
634 self.ts_started = None;
637 self.ts_shutdown = None;
638
639 log::info!("Reset");
640 }
641
642 pub fn dispose(&mut self) {
644 log::info!("Disposing");
645
646 if let Err(e) = self.trader.dispose() {
647 log::error!("Error disposing trader: {e:?}");
648 }
649
650 self.stop_engines();
651
652 self.data_engine.borrow_mut().dispose();
653 log::info!("Disposed");
656 }
657
658 const fn cancel_all_tasks(&self) {
662 }
664
665 fn start_engines(&self) {
667 self.data_engine.borrow_mut().start();
668 }
670
671 fn stop_engines(&self) {
673 self.data_engine.borrow_mut().stop();
674 }
676
677 fn start_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
682 let mut errors = Vec::new();
683
684 {
685 let mut exec_engine = self.exec_engine.borrow_mut();
686 let exec_adapters = exec_engine.get_clients_mut();
687
688 for adapter in exec_adapters {
689 if let Err(e) = adapter.start() {
690 log::error!("Error starting execution client {}: {e}", adapter.client_id);
691 errors.push(e);
692 }
693 }
694 }
695
696 if errors.is_empty() {
697 Ok(())
698 } else {
699 Err(errors)
700 }
701 }
702
703 fn stop_all_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
708 let mut errors = Vec::new();
709
710 {
711 let mut exec_engine = self.exec_engine.borrow_mut();
712 let exec_adapters = exec_engine.get_clients_mut();
713
714 for adapter in exec_adapters {
715 if let Err(e) = adapter.stop() {
716 log::error!("Error stopping execution client {}: {e}", adapter.client_id);
717 errors.push(e);
718 }
719 }
720 }
721
722 if errors.is_empty() {
723 Ok(())
724 } else {
725 Err(errors)
726 }
727 }
728
729 fn stop_clients(&self) {
731 self.data_engine.borrow_mut().stop();
732 }
733
734 #[allow(clippy::await_holding_refcell_ref)] pub async fn connect_clients(&mut self) -> anyhow::Result<()> {
741 log::info!("Connecting clients...");
742 self.data_engine.borrow_mut().connect().await?;
743 self.exec_engine.borrow_mut().connect().await?;
744 Ok(())
745 }
746
747 #[allow(clippy::await_holding_refcell_ref)] pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
754 log::info!("Disconnecting clients...");
755 self.data_engine.borrow_mut().disconnect().await?;
756 self.exec_engine.borrow_mut().disconnect().await?;
757 Ok(())
758 }
759
760 const fn initialize_portfolio(&self) {
762 }
764
765 const fn await_execution_reconciliation(&self) {
769 }
771
772 const fn await_portfolio_initialized(&self) {
776 }
778
779 const fn await_trader_residuals(&self) {
783 }
785
786 #[must_use]
788 pub fn check_engines_connected(&self) -> bool {
789 self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
790 }
791
792 #[must_use]
794 pub fn check_engines_disconnected(&self) -> bool {
795 self.data_engine.borrow().check_disconnected()
796 && self.exec_engine.borrow().check_disconnected()
797 }
798
799 const fn check_portfolio_initialized(&self) {
801 }
803
804 const fn flush_writer(&self) {
806 }
808}