1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::{Ref, RefCell},
23 rc::Rc,
24};
25
26#[cfg(feature = "live")]
27use nautilus_common::live::clock::LiveClock;
28use nautilus_common::{
29 cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
30 clock::{Clock, TestClock},
31 component::Component,
32 enums::Environment,
33 logging::{
34 headers, init_logging, init_tracing,
35 logger::{LogGuard, LoggerConfig},
36 writer::FileWriterConfig,
37 },
38 messages::{DataResponse, data::DataCommand, execution::TradingCommand},
39 msgbus::{
40 self, MessageBus, get_message_bus,
41 handler::{ShareableMessageHandler, TypedMessageHandler},
42 set_message_bus,
43 switchboard::MessagingSwitchboard,
44 },
45 runner::get_data_cmd_sender,
46};
47use nautilus_core::{UUID4, UnixNanos};
48use nautilus_data::engine::DataEngine;
49use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
50use nautilus_model::{events::OrderEventAny, identifiers::TraderId};
51use nautilus_portfolio::portfolio::Portfolio;
52use nautilus_risk::engine::RiskEngine;
53use ustr::Ustr;
54
55use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
56
57#[derive(Debug)]
61pub struct NautilusKernel {
62 pub name: String,
64 pub instance_id: UUID4,
66 pub machine_id: String,
68 pub config: Box<dyn NautilusKernelConfig>,
70 pub cache: Rc<RefCell<Cache>>,
72 pub clock: Rc<RefCell<dyn Clock>>,
74 pub portfolio: Rc<RefCell<Portfolio>>,
76 pub log_guard: LogGuard,
78 pub data_engine: Rc<RefCell<DataEngine>>,
80 pub risk_engine: Rc<RefCell<RiskEngine>>,
82 pub exec_engine: Rc<RefCell<ExecutionEngine>>,
84 pub order_emulator: OrderEmulatorAdapter,
86 pub trader: Trader,
88 pub ts_created: UnixNanos,
90 pub ts_started: Option<UnixNanos>,
92 pub ts_shutdown: Option<UnixNanos>,
94}
95
96impl NautilusKernel {
97 #[must_use]
99 pub const fn builder(
100 name: String,
101 trader_id: TraderId,
102 environment: Environment,
103 ) -> NautilusKernelBuilder {
104 NautilusKernelBuilder::new(name, trader_id, environment)
105 }
106
107 pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
113 let instance_id = config.instance_id().unwrap_or_default();
114 let machine_id = Self::determine_machine_id()?;
115
116 let logger_config = config.logging();
117 let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
118 headers::log_header(
119 config.trader_id(),
120 &machine_id,
121 instance_id,
122 Ustr::from(stringify!(LiveNode)),
123 );
124
125 log::info!("Building system kernel");
126
127 let clock = Self::initialize_clock(&config.environment());
128 let cache = Self::initialize_cache(config.cache());
129
130 let msgbus = Rc::new(RefCell::new(MessageBus::new(
131 config.trader_id(),
132 instance_id,
133 Some(name.clone()),
134 None,
135 )));
136 set_message_bus(msgbus);
137
138 let portfolio = Rc::new(RefCell::new(Portfolio::new(
139 cache.clone(),
140 clock.clone(),
141 config.portfolio(),
142 )));
143
144 let risk_engine = RiskEngine::new(
145 config.risk_engine().unwrap_or_default(),
146 Portfolio::new(cache.clone(), clock.clone(), config.portfolio()),
147 clock.clone(),
148 cache.clone(),
149 );
150 let risk_engine = Rc::new(RefCell::new(risk_engine));
151
152 let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
153 let exec_engine = Rc::new(RefCell::new(exec_engine));
154
155 let order_emulator = OrderEmulatorAdapter::new(clock.clone(), cache.clone());
157
158 let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
159 let data_engine = Rc::new(RefCell::new(data_engine));
160
161 use nautilus_core::WeakCell;
163
164 let data_engine_weak = WeakCell::from(Rc::downgrade(&data_engine));
165 let data_engine_weak_clone1 = data_engine_weak.clone();
166 let endpoint = MessagingSwitchboard::data_engine_execute();
167 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
168 move |cmd: &DataCommand| {
169 if let Some(engine_rc) = data_engine_weak_clone1.upgrade() {
170 engine_rc.borrow_mut().execute(cmd);
171 }
172 },
173 )));
174 msgbus::register(endpoint, handler);
175
176 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
178 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
179 move |cmd: &DataCommand| {
180 get_data_cmd_sender().clone().execute(cmd.clone());
181 },
182 )));
183 msgbus::register(endpoint, handler);
184
185 let endpoint = MessagingSwitchboard::data_engine_process();
187 let data_engine_weak2 = data_engine_weak.clone();
188 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
189 move |data: &dyn Any| {
190 if let Some(engine_rc) = data_engine_weak2.upgrade() {
191 engine_rc.borrow_mut().process(data);
192 }
193 },
194 )));
195 msgbus::register(endpoint, handler);
196
197 let endpoint = MessagingSwitchboard::data_engine_response();
199 let data_engine_weak3 = data_engine_weak;
200 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
201 move |resp: &DataResponse| {
202 if let Some(engine_rc) = data_engine_weak3.upgrade() {
203 engine_rc.borrow_mut().response(resp.clone());
204 }
205 },
206 )));
207 msgbus::register(endpoint, handler);
208
209 let risk_engine_weak = WeakCell::from(Rc::downgrade(&risk_engine));
211 let endpoint = MessagingSwitchboard::risk_engine_execute();
212 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
213 move |cmd: &TradingCommand| {
214 if let Some(engine_rc) = risk_engine_weak.upgrade() {
215 engine_rc.borrow_mut().execute(cmd.clone());
216 }
217 },
218 )));
219 msgbus::register(endpoint, handler);
220
221 let exec_engine_weak = WeakCell::from(Rc::downgrade(&exec_engine));
223 let exec_engine_weak_clone = exec_engine_weak.clone();
224 let endpoint = MessagingSwitchboard::exec_engine_execute();
225 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
226 move |cmd: &TradingCommand| {
227 if let Some(engine_rc) = exec_engine_weak.upgrade() {
228 engine_rc.borrow().execute(cmd);
229 }
230 },
231 )));
232 msgbus::register(endpoint, handler);
233
234 let endpoint = MessagingSwitchboard::exec_engine_process();
236 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
237 move |event: &OrderEventAny| {
238 if let Some(engine_rc) = exec_engine_weak_clone.upgrade() {
239 engine_rc.borrow_mut().process(event);
240 } else {
241 log::error!(
242 "ExecEngine dropped, cannot process order event: {:?}",
243 event.client_order_id()
244 );
245 }
246 },
247 )));
248 msgbus::register(endpoint, handler);
249
250 let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
252 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
253 move |report: &dyn Any| {
254 log::debug!(
255 "Received execution report for reconciliation: {:?}",
256 report.type_id()
257 );
258 },
259 )));
260 msgbus::register(endpoint, handler);
261
262 let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_mass_status();
263 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
264 move |report: &dyn Any| {
265 log::debug!(
266 "Received execution mass status for reconciliation: {:?}",
267 report.type_id()
268 );
269 },
270 )));
271 msgbus::register(endpoint, handler);
272
273 let trader = Trader::new(
274 config.trader_id(),
275 instance_id,
276 config.environment(),
277 clock.clone(),
278 cache.clone(),
279 portfolio.clone(),
280 );
281
282 let ts_created = clock.borrow().timestamp_ns();
283
284 Ok(Self {
285 name,
286 instance_id,
287 machine_id,
288 config: Box::new(config),
289 cache,
290 clock,
291 portfolio,
292 log_guard,
293 data_engine,
294 risk_engine,
295 exec_engine,
296 order_emulator,
297 trader,
298 ts_created,
299 ts_started: None,
300 ts_shutdown: None,
301 })
302 }
303
304 fn determine_machine_id() -> anyhow::Result<String> {
305 Ok(hostname::get()?.to_string_lossy().into_owned())
306 }
307
308 fn initialize_logging(
309 trader_id: TraderId,
310 instance_id: UUID4,
311 config: LoggerConfig,
312 ) -> anyhow::Result<LogGuard> {
313 let log_guard = init_logging(
314 trader_id,
315 instance_id,
316 config,
317 FileWriterConfig::default(), )?;
319
320 init_tracing()?;
321
322 Ok(log_guard)
323 }
324
325 fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
326 match environment {
327 Environment::Backtest => {
328 let test_clock = TestClock::new();
329 Rc::new(RefCell::new(test_clock))
330 }
331 #[cfg(feature = "live")]
332 Environment::Live | Environment::Sandbox => {
333 let live_clock = LiveClock::default();
334 Rc::new(RefCell::new(live_clock))
335 }
336 #[cfg(not(feature = "live"))]
337 Environment::Live | Environment::Sandbox => {
338 panic!(
339 "Live/Sandbox environment requires the 'live' feature to be enabled. \
340 Build with `--features live` or add `features = [\"live\"]` to your dependency."
341 );
342 }
343 }
344 }
345
346 fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
347 let cache_config = cache_config.unwrap_or_default();
348
349 let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
351 let cache = Cache::new(Some(cache_config), cache_database);
352
353 Rc::new(RefCell::new(cache))
354 }
355
356 fn cancel_timers(&self) {
357 self.clock.borrow_mut().cancel_timers();
358 }
359
360 #[must_use]
361 pub fn generate_timestamp_ns(&self) -> UnixNanos {
362 self.clock.borrow().timestamp_ns()
363 }
364
365 #[must_use]
367 pub fn environment(&self) -> Environment {
368 self.config.environment()
369 }
370
371 #[must_use]
373 pub const fn name(&self) -> &str {
374 self.name.as_str()
375 }
376
377 #[must_use]
379 pub fn trader_id(&self) -> TraderId {
380 self.config.trader_id()
381 }
382
383 #[must_use]
385 pub fn machine_id(&self) -> &str {
386 &self.machine_id
387 }
388
389 #[must_use]
391 pub const fn instance_id(&self) -> UUID4 {
392 self.instance_id
393 }
394
395 #[must_use]
397 pub const fn ts_created(&self) -> UnixNanos {
398 self.ts_created
399 }
400
401 #[must_use]
403 pub const fn ts_started(&self) -> Option<UnixNanos> {
404 self.ts_started
405 }
406
407 #[must_use]
409 pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
410 self.ts_shutdown
411 }
412
413 #[must_use]
415 pub fn load_state(&self) -> bool {
416 self.config.load_state()
417 }
418
419 #[must_use]
421 pub fn save_state(&self) -> bool {
422 self.config.save_state()
423 }
424
425 #[must_use]
427 pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
428 self.clock.clone()
429 }
430
431 #[must_use]
433 pub fn cache(&self) -> Rc<RefCell<Cache>> {
434 self.cache.clone()
435 }
436
437 #[must_use]
439 pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
440 get_message_bus()
441 }
442
443 #[must_use]
445 pub fn portfolio(&self) -> Ref<'_, Portfolio> {
446 self.portfolio.borrow()
447 }
448
449 #[must_use]
451 pub fn data_engine(&self) -> Ref<'_, DataEngine> {
452 self.data_engine.borrow()
453 }
454
455 #[must_use]
457 pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
458 &self.risk_engine
459 }
460
461 #[must_use]
463 pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
464 &self.exec_engine
465 }
466
467 #[must_use]
469 pub const fn trader(&self) -> &Trader {
470 &self.trader
471 }
472
473 pub async fn start_async(&mut self) {
475 log::info!("Starting");
476 self.start_engines();
477
478 log::info!("Initializing trader");
479 if let Err(e) = self.trader.initialize() {
480 log::error!("Error initializing trader: {e:?}");
481 return;
482 }
483
484 log::info!("Starting clients...");
485 if let Err(e) = self.start_clients() {
486 log::error!("Error starting clients: {e:?}");
487 }
488 log::info!("Clients started");
489
490 if let Err(e) = self.trader.start() {
491 log::error!("Error starting trader: {e:?}");
492 }
493
494 self.ts_started = Some(self.clock.borrow().timestamp_ns());
495 log::info!("Started");
496 }
497
498 pub async fn stop_async(&mut self) {
500 log::info!("Stopping");
501
502 if let Err(e) = self.trader.stop() {
504 log::error!("Error stopping trader: {e:?}");
505 }
506
507 #[cfg(feature = "live")]
509 {
510 let delay = self.config.delay_post_stop();
511 log::info!("Awaiting residual events ({delay:?})...");
512 std::thread::sleep(delay);
513 }
514
515 if let Err(e) = self.stop_all_clients() {
517 log::error!("Error stopping clients: {e:?}");
518 }
519
520 self.stop_engines();
521 self.cancel_timers();
522
523 self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
524 log::info!("Stopped");
525 }
526
527 pub fn reset(&mut self) {
529 log::info!("Resetting");
530
531 if let Err(e) = self.trader.reset() {
532 log::error!("Error resetting trader: {e:?}");
533 }
534
535 self.data_engine.borrow_mut().reset();
537 self.ts_started = None;
540 self.ts_shutdown = None;
541
542 log::info!("Reset");
543 }
544
545 pub fn dispose(&mut self) {
547 log::info!("Disposing");
548
549 if let Err(e) = self.trader.dispose() {
550 log::error!("Error disposing trader: {e:?}");
551 }
552
553 self.stop_engines();
554
555 self.data_engine.borrow_mut().dispose();
556 log::info!("Disposed");
559 }
560
561 const fn cancel_all_tasks(&self) {
565 }
567
568 fn start_engines(&self) {
570 self.data_engine.borrow_mut().start();
571 }
573
574 fn stop_engines(&self) {
576 self.data_engine.borrow_mut().stop();
577 }
579
580 fn start_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
585 let mut errors = Vec::new();
586
587 {
588 let mut exec_engine = self.exec_engine.borrow_mut();
589 let exec_adapters = exec_engine.get_clients_mut();
590
591 for adapter in exec_adapters {
592 if let Err(e) = adapter.start() {
593 log::error!("Error starting execution client {}: {e}", adapter.client_id);
594 errors.push(e);
595 }
596 }
597 }
598
599 if errors.is_empty() {
600 Ok(())
601 } else {
602 Err(errors)
603 }
604 }
605
606 fn stop_all_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
611 let mut errors = Vec::new();
612
613 {
614 let mut exec_engine = self.exec_engine.borrow_mut();
615 let exec_adapters = exec_engine.get_clients_mut();
616
617 for adapter in exec_adapters {
618 if let Err(e) = adapter.stop() {
619 log::error!("Error stopping execution client {}: {e}", adapter.client_id);
620 errors.push(e);
621 }
622 }
623 }
624
625 if errors.is_empty() {
626 Ok(())
627 } else {
628 Err(errors)
629 }
630 }
631
632 fn stop_clients(&self) {
634 self.data_engine.borrow_mut().stop();
635 }
636
637 #[allow(clippy::await_holding_refcell_ref)] pub async fn connect_clients(&mut self) -> anyhow::Result<()> {
644 log::info!("Connecting clients...");
645 self.data_engine.borrow_mut().connect().await?;
646 self.exec_engine.borrow_mut().connect().await?;
647 Ok(())
648 }
649
650 #[allow(clippy::await_holding_refcell_ref)] pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
657 log::info!("Disconnecting clients...");
658 self.data_engine.borrow_mut().disconnect().await?;
659 self.exec_engine.borrow_mut().disconnect().await?;
660 Ok(())
661 }
662
663 const fn initialize_portfolio(&self) {
665 }
667
668 const fn await_execution_reconciliation(&self) {
672 }
674
675 const fn await_portfolio_initialized(&self) {
679 }
681
682 const fn await_trader_residuals(&self) {
686 }
688
689 #[must_use]
691 pub fn check_engines_connected(&self) -> bool {
692 self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
693 }
694
695 #[must_use]
697 pub fn check_engines_disconnected(&self) -> bool {
698 self.data_engine.borrow().check_disconnected()
699 && self.exec_engine.borrow().check_disconnected()
700 }
701
702 const fn check_portfolio_initialized(&self) {
704 }
706
707 const fn flush_writer(&self) {
709 }
711}