1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::{Ref, RefCell},
23 rc::Rc,
24};
25
26use futures::future::join_all;
27use nautilus_common::{
28 cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
29 clock::{Clock, LiveClock, TestClock},
30 component::Component,
31 enums::Environment,
32 logging::{
33 headers, init_logging, init_tracing,
34 logger::{LogGuard, LoggerConfig},
35 writer::FileWriterConfig,
36 },
37 messages::{DataResponse, data::DataCommand},
38 msgbus::{
39 self, MessageBus, get_message_bus,
40 handler::{ShareableMessageHandler, TypedMessageHandler},
41 set_message_bus,
42 switchboard::MessagingSwitchboard,
43 },
44 runner::get_data_cmd_sender,
45};
46use nautilus_core::{UUID4, UnixNanos};
47use nautilus_data::engine::DataEngine;
48use nautilus_execution::engine::ExecutionEngine;
49use nautilus_model::identifiers::TraderId;
50use nautilus_portfolio::portfolio::Portfolio;
51use nautilus_risk::engine::RiskEngine;
52use ustr::Ustr;
53
54use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
55
56#[derive(Debug)]
60pub struct NautilusKernel {
61 pub name: String,
63 pub instance_id: UUID4,
65 pub machine_id: String,
67 pub config: Box<dyn NautilusKernelConfig>,
69 pub cache: Rc<RefCell<Cache>>,
71 pub clock: Rc<RefCell<dyn Clock>>,
73 pub portfolio: Portfolio,
75 pub log_guard: LogGuard,
77 pub data_engine: Rc<RefCell<DataEngine>>,
79 pub risk_engine: RiskEngine,
81 pub exec_engine: ExecutionEngine,
83 pub trader: Trader,
85 pub ts_created: UnixNanos,
87 pub ts_started: Option<UnixNanos>,
89 pub ts_shutdown: Option<UnixNanos>,
91}
92
93impl NautilusKernel {
94 #[must_use]
96 pub const fn builder(
97 name: String,
98 trader_id: TraderId,
99 environment: nautilus_common::enums::Environment,
100 ) -> NautilusKernelBuilder {
101 NautilusKernelBuilder::new(name, trader_id, environment)
102 }
103
104 pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
110 let instance_id = config.instance_id().unwrap_or_default();
111 let machine_id = Self::determine_machine_id()?;
112
113 let logger_config = config.logging();
114 let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
115 headers::log_header(
116 config.trader_id(),
117 &machine_id,
118 instance_id,
119 Ustr::from(stringify!(LiveNode)),
120 );
121
122 log::info!("Building system kernel");
123
124 let clock = Self::initialize_clock(&config.environment());
125 let cache = Self::initialize_cache(config.cache());
126
127 let msgbus = Rc::new(RefCell::new(MessageBus::new(
128 config.trader_id(),
129 instance_id,
130 Some(name.to_string()),
131 None,
132 )));
133 set_message_bus(msgbus);
134
135 let portfolio = Portfolio::new(cache.clone(), clock.clone(), config.portfolio());
136 let risk_engine = RiskEngine::new(
137 config.risk_engine().unwrap_or_default(),
138 Portfolio::new(cache.clone(), clock.clone(), config.portfolio()),
139 clock.clone(),
140 cache.clone(),
141 );
142 let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
143
144 let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
145 let data_engine = Rc::new(RefCell::new(data_engine));
146
147 use nautilus_core::WeakCell;
149
150 let data_engine_weak = WeakCell::from(Rc::downgrade(&data_engine));
151 let data_engine_weak_clone1 = data_engine_weak.clone();
152 let endpoint = MessagingSwitchboard::data_engine_execute();
153 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
154 move |cmd: &DataCommand| {
155 if let Some(engine_rc) = data_engine_weak_clone1.upgrade() {
156 engine_rc.borrow_mut().execute(cmd);
157 }
158 },
159 )));
160 msgbus::register(endpoint, handler);
161
162 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
164 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
165 move |cmd: &DataCommand| {
166 get_data_cmd_sender().clone().execute(cmd.clone());
167 },
168 )));
169 msgbus::register(endpoint, handler);
170
171 let endpoint = MessagingSwitchboard::data_engine_process();
173 let data_engine_weak2 = data_engine_weak.clone();
174 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
175 move |data: &dyn Any| {
176 if let Some(engine_rc) = data_engine_weak2.upgrade() {
177 engine_rc.borrow_mut().process(data);
178 }
179 },
180 )));
181 msgbus::register(endpoint, handler);
182
183 let endpoint = MessagingSwitchboard::data_engine_response();
185 let data_engine_weak3 = data_engine_weak;
186 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
187 move |resp: &DataResponse| {
188 if let Some(engine_rc) = data_engine_weak3.upgrade() {
189 engine_rc.borrow_mut().response(resp.clone());
190 }
191 },
192 )));
193 msgbus::register(endpoint, handler);
194
195 let trader = Trader::new(
196 config.trader_id(),
197 instance_id,
198 config.environment(),
199 clock.clone(),
200 cache.clone(),
201 );
202
203 let ts_created = clock.borrow().timestamp_ns();
204
205 Ok(Self {
206 name,
207 instance_id,
208 machine_id,
209 config: Box::new(config),
210 cache,
211 clock,
212 portfolio,
213 log_guard,
214 data_engine,
215 risk_engine,
216 exec_engine,
217 trader,
218 ts_created,
219 ts_started: None,
220 ts_shutdown: None,
221 })
222 }
223
224 fn determine_machine_id() -> anyhow::Result<String> {
225 Ok(hostname::get()?.to_string_lossy().into_owned())
226 }
227
228 fn initialize_logging(
229 trader_id: TraderId,
230 instance_id: UUID4,
231 config: LoggerConfig,
232 ) -> anyhow::Result<LogGuard> {
233 init_tracing()?;
234
235 let log_guard = init_logging(
236 trader_id,
237 instance_id,
238 config,
239 FileWriterConfig::default(), )?;
241
242 Ok(log_guard)
243 }
244
245 fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
246 match environment {
247 Environment::Backtest => {
248 let test_clock = TestClock::new();
249 Rc::new(RefCell::new(test_clock))
250 }
251 Environment::Live | Environment::Sandbox => {
252 let live_clock = LiveClock::default();
253 Rc::new(RefCell::new(live_clock))
254 }
255 }
256 }
257
258 fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
259 let cache_config = cache_config.unwrap_or_default();
260
261 let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
263 let cache = Cache::new(Some(cache_config), cache_database);
264
265 Rc::new(RefCell::new(cache))
266 }
267
268 fn cancel_timers(&self) {
269 self.clock.borrow_mut().cancel_timers();
270 }
271
272 #[must_use]
273 pub fn generate_timestamp_ns(&self) -> UnixNanos {
274 self.clock.borrow().timestamp_ns()
275 }
276
277 #[must_use]
279 pub fn environment(&self) -> Environment {
280 self.config.environment()
281 }
282
283 #[must_use]
285 pub const fn name(&self) -> &str {
286 self.name.as_str()
287 }
288
289 #[must_use]
291 pub fn trader_id(&self) -> TraderId {
292 self.config.trader_id()
293 }
294
295 #[must_use]
297 pub fn machine_id(&self) -> &str {
298 &self.machine_id
299 }
300
301 #[must_use]
303 pub const fn instance_id(&self) -> UUID4 {
304 self.instance_id
305 }
306
307 #[must_use]
309 pub const fn ts_created(&self) -> UnixNanos {
310 self.ts_created
311 }
312
313 #[must_use]
315 pub const fn ts_started(&self) -> Option<UnixNanos> {
316 self.ts_started
317 }
318
319 #[must_use]
321 pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
322 self.ts_shutdown
323 }
324
325 #[must_use]
327 pub fn load_state(&self) -> bool {
328 self.config.load_state()
329 }
330
331 #[must_use]
333 pub fn save_state(&self) -> bool {
334 self.config.save_state()
335 }
336
337 #[must_use]
339 pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
340 self.clock.clone()
341 }
342
343 #[must_use]
345 pub fn cache(&self) -> Rc<RefCell<Cache>> {
346 self.cache.clone()
347 }
348
349 #[must_use]
351 pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
352 get_message_bus()
353 }
354
355 #[must_use]
357 pub const fn portfolio(&self) -> &Portfolio {
358 &self.portfolio
359 }
360
361 #[must_use]
363 pub fn data_engine(&self) -> Ref<'_, DataEngine> {
364 self.data_engine.borrow()
365 }
366
367 #[must_use]
369 pub const fn risk_engine(&self) -> &RiskEngine {
370 &self.risk_engine
371 }
372
373 #[must_use]
375 pub const fn exec_engine(&self) -> &ExecutionEngine {
376 &self.exec_engine
377 }
378
379 #[must_use]
381 pub const fn trader(&self) -> &Trader {
382 &self.trader
383 }
384
385 pub async fn start_async(&mut self) {
387 log::info!("Starting");
388 self.start_engines();
389
390 log::info!("Initializing trader");
391 if let Err(e) = self.trader.initialize() {
392 log::error!("Error initializing trader: {e:?}");
393 return;
394 }
395
396 log::info!("Connecting clients...");
397 if let Err(e) = self.connect_clients().await {
398 log::error!("Error connecting clients: {e:?}");
399 }
400 log::info!("Clients connected");
401
402 if let Err(e) = self.trader.start() {
403 log::error!("Error starting trader: {e:?}");
404 }
405
406 self.ts_started = Some(self.clock.borrow().timestamp_ns());
407 log::info!("Started");
408 }
409
410 pub async fn stop_async(&mut self) {
412 log::info!("Stopping");
413
414 if let Err(e) = self.trader.stop() {
416 log::error!("Error stopping trader: {e:?}");
417 }
418
419 if let Err(e) = self.disconnect_clients().await {
421 log::error!("Error disconnecting clients: {e:?}");
422 }
423
424 self.stop_engines();
425 self.cancel_timers();
426
427 self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
428 log::info!("Stopped");
429 }
430
431 pub fn reset(&mut self) {
433 log::info!("Resetting");
434
435 if let Err(e) = self.trader.reset() {
436 log::error!("Error resetting trader: {e:?}");
437 }
438
439 self.data_engine.borrow_mut().reset();
441 self.ts_started = None;
444 self.ts_shutdown = None;
445
446 log::info!("Reset");
447 }
448
449 pub fn dispose(&mut self) {
451 log::info!("Disposing");
452
453 if let Err(e) = self.trader.dispose() {
454 log::error!("Error disposing trader: {e:?}");
455 }
456
457 self.stop_engines();
458
459 self.data_engine.borrow_mut().dispose();
460 log::info!("Disposed");
463 }
464
465 const fn cancel_all_tasks(&self) {
469 }
471
472 fn start_engines(&self) {
474 self.data_engine.borrow_mut().start();
475 }
477
478 fn stop_engines(&self) {
480 self.data_engine.borrow_mut().stop();
481 }
483
484 #[allow(clippy::await_holding_refcell_ref)]
486 async fn connect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
487 let mut data_engine = self.data_engine.borrow_mut();
488 let mut data_adapters = data_engine.get_clients_mut();
489 let mut futures = Vec::with_capacity(data_adapters.len());
490
491 for adapter in &mut data_adapters {
492 futures.push(adapter.connect());
493 }
494
495 let results = join_all(futures).await;
496 let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
497
498 if errors.is_empty() {
499 Ok(())
500 } else {
501 Err(errors)
502 }
503 }
504
505 #[allow(clippy::await_holding_refcell_ref)]
507 async fn disconnect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
508 let mut data_engine = self.data_engine.borrow_mut();
509 let mut data_adapters = data_engine.get_clients_mut();
510 let mut futures = Vec::with_capacity(data_adapters.len());
511
512 for adapter in &mut data_adapters {
513 futures.push(adapter.disconnect());
514 }
515
516 let results = join_all(futures).await;
517 let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
518
519 if errors.is_empty() {
520 Ok(())
521 } else {
522 Err(errors)
523 }
524 }
525
526 fn stop_clients(&self) {
528 self.data_engine.borrow_mut().stop();
529 }
530
531 const fn initialize_portfolio(&self) {
533 }
535
536 const fn await_engines_connected(&self) {
540 }
542
543 const fn await_execution_reconciliation(&self) {
547 }
549
550 const fn await_portfolio_initialized(&self) {
554 }
556
557 const fn await_trader_residuals(&self) {
561 }
563
564 const fn check_engines_connected(&self) {
566 }
568
569 const fn check_engines_disconnected(&self) {
571 }
573
574 const fn check_portfolio_initialized(&self) {
576 }
578
579 const fn flush_writer(&self) {
581 }
583}