nautilus_system/
kernel.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell},
19    rc::Rc,
20    time::Duration,
21};
22
23use nautilus_common::{
24    cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
25    clock::{Clock, TestClock},
26    component::Component,
27    enums::Environment,
28    logging::{
29        headers, init_logging,
30        logger::{LogGuard, LoggerConfig},
31        writer::FileWriterConfig,
32    },
33    messages::{DataResponse, ExecutionReport, data::DataCommand, execution::TradingCommand},
34    msgbus::{
35        self, MessageBus,
36        handler::{ShareableMessageHandler, TypedMessageHandler},
37        set_message_bus,
38        switchboard::MessagingSwitchboard,
39    },
40    runner::get_data_cmd_sender,
41};
42use nautilus_core::{UUID4, UnixNanos, WeakCell};
43use nautilus_data::engine::DataEngine;
44use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
45use nautilus_model::{events::OrderEventAny, identifiers::TraderId};
46use nautilus_portfolio::portfolio::Portfolio;
47use nautilus_risk::engine::RiskEngine;
48use ustr::Ustr;
49
50use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
51
52/// Core Nautilus system kernel.
53///
54/// Orchestrates data and execution engines, cache, clock, and messaging across environments.
55#[derive(Debug)]
56pub struct NautilusKernel {
57    /// The kernel name (for logging and identification).
58    pub name: String,
59    /// The unique instance identifier for this kernel.
60    pub instance_id: UUID4,
61    /// The machine identifier (hostname or similar).
62    pub machine_id: String,
63    /// The kernel configuration.
64    pub config: Box<dyn NautilusKernelConfig>,
65    /// The shared in-memory cache.
66    pub cache: Rc<RefCell<Cache>>,
67    /// The clock driving the kernel.
68    pub clock: Rc<RefCell<dyn Clock>>,
69    /// The portfolio manager.
70    pub portfolio: Rc<RefCell<Portfolio>>,
71    /// Guard for the logging subsystem (keeps logger thread alive).
72    pub log_guard: LogGuard,
73    /// The data engine instance.
74    pub data_engine: Rc<RefCell<DataEngine>>,
75    /// The risk engine instance.
76    pub risk_engine: Rc<RefCell<RiskEngine>>,
77    /// The execution engine instance.
78    pub exec_engine: Rc<RefCell<ExecutionEngine>>,
79    /// The order emulator for handling emulated orders.
80    pub order_emulator: OrderEmulatorAdapter,
81    /// The trader component.
82    pub trader: Trader,
83    /// The UNIX timestamp (nanoseconds) when the kernel was created.
84    pub ts_created: UnixNanos,
85    /// The UNIX timestamp (nanoseconds) when the kernel was last started.
86    pub ts_started: Option<UnixNanos>,
87    /// The UNIX timestamp (nanoseconds) when the kernel was last shutdown.
88    pub ts_shutdown: Option<UnixNanos>,
89}
90
91impl NautilusKernel {
92    /// Create a new [`NautilusKernelBuilder`] for fluent configuration.
93    #[must_use]
94    pub const fn builder(
95        name: String,
96        trader_id: TraderId,
97        environment: Environment,
98    ) -> NautilusKernelBuilder {
99        NautilusKernelBuilder::new(name, trader_id, environment)
100    }
101
102    /// Create a new [`NautilusKernel`] instance.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if the kernel fails to initialize.
107    pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
108        let instance_id = config.instance_id().unwrap_or_default();
109        let machine_id = Self::determine_machine_id()?;
110
111        let logger_config = config.logging();
112        let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
113        headers::log_header(
114            config.trader_id(),
115            &machine_id,
116            instance_id,
117            Ustr::from(stringify!(LiveNode)),
118        );
119
120        log::info!("Building system kernel");
121
122        let clock = Self::initialize_clock(&config.environment());
123        let cache = Self::initialize_cache(config.cache());
124
125        let msgbus = Rc::new(RefCell::new(MessageBus::new(
126            config.trader_id(),
127            instance_id,
128            Some(name.clone()),
129            None,
130        )));
131        set_message_bus(msgbus);
132
133        let portfolio = Rc::new(RefCell::new(Portfolio::new(
134            cache.clone(),
135            clock.clone(),
136            config.portfolio(),
137        )));
138
139        let risk_engine = RiskEngine::new(
140            config.risk_engine().unwrap_or_default(),
141            portfolio.borrow().clone_shallow(),
142            clock.clone(),
143            cache.clone(),
144        );
145        let risk_engine = Rc::new(RefCell::new(risk_engine));
146
147        let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
148        let exec_engine = Rc::new(RefCell::new(exec_engine));
149
150        // Create order emulator (auto-registers message handlers)
151        let order_emulator = OrderEmulatorAdapter::new(clock.clone(), cache.clone());
152
153        let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
154        let data_engine = Rc::new(RefCell::new(data_engine));
155
156        // Register DataEngine command execution
157
158        let data_engine_weak = WeakCell::from(Rc::downgrade(&data_engine));
159        let data_engine_weak_clone1 = data_engine_weak.clone();
160        let endpoint = MessagingSwitchboard::data_engine_execute();
161        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
162            move |cmd: &DataCommand| {
163                if let Some(engine_rc) = data_engine_weak_clone1.upgrade() {
164                    engine_rc.borrow_mut().execute(cmd);
165                }
166            },
167        )));
168        msgbus::register(endpoint, handler);
169
170        // Register DataEngine command queueing
171        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
172        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
173            move |cmd: &DataCommand| {
174                get_data_cmd_sender().clone().execute(cmd.clone());
175            },
176        )));
177        msgbus::register(endpoint, handler);
178
179        // Register DataEngine process handler
180        let endpoint = MessagingSwitchboard::data_engine_process();
181        let data_engine_weak2 = data_engine_weak.clone();
182        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
183            move |data: &dyn Any| {
184                if let Some(engine_rc) = data_engine_weak2.upgrade() {
185                    engine_rc.borrow_mut().process(data);
186                }
187            },
188        )));
189        msgbus::register(endpoint, handler);
190
191        // Register DataEngine response handler
192        let endpoint = MessagingSwitchboard::data_engine_response();
193        let data_engine_weak3 = data_engine_weak;
194        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
195            move |resp: &DataResponse| {
196                if let Some(engine_rc) = data_engine_weak3.upgrade() {
197                    engine_rc.borrow_mut().response(resp.clone());
198                }
199            },
200        )));
201        msgbus::register(endpoint, handler);
202
203        // Register RiskEngine execute handler
204        let risk_engine_weak = WeakCell::from(Rc::downgrade(&risk_engine));
205        let endpoint = MessagingSwitchboard::risk_engine_execute();
206        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
207            move |cmd: &TradingCommand| {
208                if let Some(engine_rc) = risk_engine_weak.upgrade() {
209                    engine_rc.borrow_mut().execute(cmd.clone());
210                }
211            },
212        )));
213        msgbus::register(endpoint, handler);
214
215        // Register ExecEngine execute handler
216        let exec_engine_weak1 = WeakCell::from(Rc::downgrade(&exec_engine));
217        let endpoint = MessagingSwitchboard::exec_engine_execute();
218        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
219            move |cmd: &TradingCommand| {
220                if let Some(engine_rc) = exec_engine_weak1.upgrade() {
221                    engine_rc.borrow().execute(cmd);
222                }
223            },
224        )));
225        msgbus::register(endpoint, handler);
226
227        // Register ExecEngine process handler
228        let exec_engine_weak2 = WeakCell::from(Rc::downgrade(&exec_engine));
229        let endpoint = MessagingSwitchboard::exec_engine_process();
230        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
231            move |event: &OrderEventAny| {
232                if let Some(engine_rc) = exec_engine_weak2.upgrade() {
233                    engine_rc.borrow_mut().process(event);
234                }
235            },
236        )));
237        msgbus::register(endpoint, handler);
238
239        // Register ExecEngine report handler
240        let exec_engine_weak3 = WeakCell::from(Rc::downgrade(&exec_engine));
241        let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
242        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
243            move |report: &ExecutionReport| {
244                if let Some(engine_rc) = exec_engine_weak3.upgrade() {
245                    engine_rc.borrow_mut().reconcile_execution_report(report);
246                }
247            },
248        )));
249        msgbus::register(endpoint, handler);
250
251        let trader = Trader::new(
252            config.trader_id(),
253            instance_id,
254            config.environment(),
255            clock.clone(),
256            cache.clone(),
257            portfolio.clone(),
258        );
259
260        let ts_created = clock.borrow().timestamp_ns();
261
262        Ok(Self {
263            name,
264            instance_id,
265            machine_id,
266            config: Box::new(config),
267            cache,
268            clock,
269            portfolio,
270            log_guard,
271            data_engine,
272            risk_engine,
273            exec_engine,
274            order_emulator,
275            trader,
276            ts_created,
277            ts_started: None,
278            ts_shutdown: None,
279        })
280    }
281
282    fn determine_machine_id() -> anyhow::Result<String> {
283        sysinfo::System::host_name().ok_or_else(|| anyhow::anyhow!("Failed to determine hostname"))
284    }
285
286    fn initialize_logging(
287        trader_id: TraderId,
288        instance_id: UUID4,
289        config: LoggerConfig,
290    ) -> anyhow::Result<LogGuard> {
291        let log_guard = init_logging(
292            trader_id,
293            instance_id,
294            config,
295            FileWriterConfig::default(), // TODO: Properly incorporate file writer config
296        )?;
297
298        Ok(log_guard)
299    }
300
301    fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
302        match environment {
303            Environment::Backtest => {
304                let test_clock = TestClock::new();
305                Rc::new(RefCell::new(test_clock))
306            }
307            #[cfg(feature = "live")]
308            Environment::Live | Environment::Sandbox => {
309                let live_clock = nautilus_common::live::clock::LiveClock::default(); // nautilus-import-ok
310                Rc::new(RefCell::new(live_clock))
311            }
312            #[cfg(not(feature = "live"))]
313            Environment::Live | Environment::Sandbox => {
314                panic!(
315                    "Live/Sandbox environment requires the 'live' feature to be enabled. \
316                     Build with `--features live` or add `features = [\"live\"]` to your dependency."
317                );
318            }
319        }
320    }
321
322    fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
323        let cache_config = cache_config.unwrap_or_default();
324
325        // TODO: Placeholder: persistent database adapter can be initialized here (e.g., Redis)
326        let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
327        let cache = Cache::new(Some(cache_config), cache_database);
328
329        Rc::new(RefCell::new(cache))
330    }
331
332    fn cancel_timers(&self) {
333        self.clock.borrow_mut().cancel_timers();
334    }
335
336    #[must_use]
337    pub fn generate_timestamp_ns(&self) -> UnixNanos {
338        self.clock.borrow().timestamp_ns()
339    }
340
341    /// Returns the kernel's environment context (Backtest, Sandbox, Live).
342    #[must_use]
343    pub fn environment(&self) -> Environment {
344        self.config.environment()
345    }
346
347    /// Returns the kernel's name.
348    #[must_use]
349    pub const fn name(&self) -> &str {
350        self.name.as_str()
351    }
352
353    /// Returns the kernel's trader ID.
354    #[must_use]
355    pub fn trader_id(&self) -> TraderId {
356        self.config.trader_id()
357    }
358
359    /// Returns the kernel's machine ID.
360    #[must_use]
361    pub fn machine_id(&self) -> &str {
362        &self.machine_id
363    }
364
365    /// Returns the kernel's instance ID.
366    #[must_use]
367    pub const fn instance_id(&self) -> UUID4 {
368        self.instance_id
369    }
370
371    /// Returns the delay after stopping the node to await residual events before final shutdown.
372    #[must_use]
373    pub fn delay_post_stop(&self) -> Duration {
374        self.config.delay_post_stop()
375    }
376
377    /// Returns the UNIX timestamp (ns) when the kernel was created.
378    #[must_use]
379    pub const fn ts_created(&self) -> UnixNanos {
380        self.ts_created
381    }
382
383    /// Returns the UNIX timestamp (ns) when the kernel was last started.
384    #[must_use]
385    pub const fn ts_started(&self) -> Option<UnixNanos> {
386        self.ts_started
387    }
388
389    /// Returns the UNIX timestamp (ns) when the kernel was last shutdown.
390    #[must_use]
391    pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
392        self.ts_shutdown
393    }
394
395    /// Returns whether the kernel has been configured to load state.
396    #[must_use]
397    pub fn load_state(&self) -> bool {
398        self.config.load_state()
399    }
400
401    /// Returns whether the kernel has been configured to save state.
402    #[must_use]
403    pub fn save_state(&self) -> bool {
404        self.config.save_state()
405    }
406
407    /// Returns the kernel's clock.
408    #[must_use]
409    pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
410        self.clock.clone()
411    }
412
413    /// Returns the kernel's cache.
414    #[must_use]
415    pub fn cache(&self) -> Rc<RefCell<Cache>> {
416        self.cache.clone()
417    }
418
419    /// Returns the kernel's portfolio.
420    #[must_use]
421    pub fn portfolio(&self) -> Ref<'_, Portfolio> {
422        self.portfolio.borrow()
423    }
424
425    /// Returns the kernel's data engine.
426    #[must_use]
427    pub fn data_engine(&self) -> Ref<'_, DataEngine> {
428        self.data_engine.borrow()
429    }
430
431    /// Returns the kernel's risk engine.
432    #[must_use]
433    pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
434        &self.risk_engine
435    }
436
437    /// Returns the kernel's execution engine.
438    #[must_use]
439    pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
440        &self.exec_engine
441    }
442
443    /// Returns the kernel's trader.
444    #[must_use]
445    pub const fn trader(&self) -> &Trader {
446        &self.trader
447    }
448
449    /// Starts the Nautilus system kernel.
450    pub async fn start_async(&mut self) {
451        log::info!("Starting");
452        self.start_engines();
453
454        log::info!("Initializing trader");
455        if let Err(e) = self.trader.initialize() {
456            log::error!("Error initializing trader: {e:?}");
457            return;
458        }
459
460        log::info!("Starting clients...");
461        if let Err(e) = self.start_clients() {
462            log::error!("Error starting clients: {e:?}");
463        }
464        log::info!("Clients started");
465
466        self.ts_started = Some(self.clock.borrow().timestamp_ns());
467        log::info!("Started");
468    }
469
470    /// Starts the trader (strategies and actors).
471    ///
472    /// This should be called after clients are connected and instruments are cached.
473    pub fn start_trader(&mut self) {
474        log::info!("Starting trader...");
475        if let Err(e) = self.trader.start() {
476            log::error!("Error starting trader: {e:?}");
477        }
478        log::info!("Trader started");
479    }
480
481    /// Stops the trader and its registered components.
482    ///
483    /// This method initiates a graceful shutdown of trading components (strategies, actors)
484    /// which may trigger residual events such as order cancellations. The caller should
485    /// continue processing events after calling this method to handle these residual events.
486    pub fn stop_trader(&mut self) {
487        log::info!("Stopping");
488
489        // Stop the trader (it will stop all registered components)
490        if let Err(e) = self.trader.stop() {
491            log::error!("Error stopping trader: {e:?}");
492        }
493    }
494
495    /// Finalizes the kernel shutdown after the grace period.
496    ///
497    /// This method should be called after the residual events grace period has elapsed
498    /// and all remaining events have been processed. It disconnects clients and stops engines.
499    pub async fn finalize_stop(&mut self) {
500        // Stop all adapter clients
501        if let Err(e) = self.stop_all_clients() {
502            log::error!("Error stopping clients: {e:?}");
503        }
504
505        self.stop_engines();
506        self.cancel_timers();
507
508        self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
509        log::info!("Stopped");
510    }
511
512    /// Resets the Nautilus system kernel to its initial state.
513    pub fn reset(&mut self) {
514        log::info!("Resetting");
515
516        if let Err(e) = self.trader.reset() {
517            log::error!("Error resetting trader: {e:?}");
518        }
519
520        self.data_engine.borrow_mut().reset();
521        self.exec_engine.borrow_mut().reset();
522        self.risk_engine.borrow_mut().reset();
523
524        self.ts_started = None;
525        self.ts_shutdown = None;
526
527        log::info!("Reset");
528    }
529
530    /// Disposes of the Nautilus system kernel, releasing resources.
531    pub fn dispose(&mut self) {
532        log::info!("Disposing");
533
534        if let Err(e) = self.trader.dispose() {
535            log::error!("Error disposing trader: {e:?}");
536        }
537
538        self.stop_engines();
539
540        self.data_engine.borrow_mut().dispose();
541        self.exec_engine.borrow_mut().dispose();
542        self.risk_engine.borrow_mut().dispose();
543
544        log::info!("Disposed");
545    }
546
547    /// Starts all engine components.
548    fn start_engines(&self) {
549        self.data_engine.borrow_mut().start();
550        self.exec_engine.borrow_mut().start();
551        self.risk_engine.borrow_mut().start();
552    }
553
554    /// Stops all engine components.
555    fn stop_engines(&self) {
556        self.data_engine.borrow_mut().stop();
557        self.exec_engine.borrow_mut().stop();
558        self.risk_engine.borrow_mut().stop();
559    }
560
561    /// Starts all engine clients.
562    ///
563    /// Note: Async connection (connect/disconnect) is handled by LiveNode for live clients.
564    /// This method only handles synchronous start operations on execution clients.
565    fn start_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
566        let mut errors = Vec::new();
567
568        {
569            let mut exec_engine = self.exec_engine.borrow_mut();
570            let exec_adapters = exec_engine.get_clients_mut();
571
572            for adapter in exec_adapters {
573                if let Err(e) = adapter.start() {
574                    log::error!("Error starting execution client {}: {e}", adapter.client_id);
575                    errors.push(e);
576                }
577            }
578        }
579
580        if errors.is_empty() {
581            Ok(())
582        } else {
583            Err(errors)
584        }
585    }
586
587    /// Stops all engine clients.
588    ///
589    /// Note: Async disconnection is handled by LiveNode for live clients.
590    /// This method only handles synchronous stop operations on execution clients.
591    fn stop_all_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
592        let mut errors = Vec::new();
593
594        {
595            let mut exec_engine = self.exec_engine.borrow_mut();
596            let exec_adapters = exec_engine.get_clients_mut();
597
598            for adapter in exec_adapters {
599                if let Err(e) = adapter.stop() {
600                    log::error!("Error stopping execution client {}: {e}", adapter.client_id);
601                    errors.push(e);
602                }
603            }
604        }
605
606        if errors.is_empty() {
607            Ok(())
608        } else {
609            Err(errors)
610        }
611    }
612
613    /// Connects all engine clients.
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if any client fails to connect.
618    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
619    pub async fn connect_clients(&mut self) -> anyhow::Result<()> {
620        log::info!("Connecting clients...");
621        self.data_engine.borrow_mut().connect().await?;
622        self.exec_engine.borrow_mut().connect().await?;
623        Ok(())
624    }
625
626    /// Disconnects all engine clients.
627    ///
628    /// # Errors
629    ///
630    /// Returns an error if any client fails to disconnect.
631    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
632    pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
633        log::info!("Disconnecting clients...");
634        self.data_engine.borrow_mut().disconnect().await?;
635        self.exec_engine.borrow_mut().disconnect().await?;
636        Ok(())
637    }
638
639    /// Returns `true` if all engine clients are connected.
640    #[must_use]
641    pub fn check_engines_connected(&self) -> bool {
642        self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
643    }
644
645    /// Returns `true` if all engine clients are disconnected.
646    #[must_use]
647    pub fn check_engines_disconnected(&self) -> bool {
648        self.data_engine.borrow().check_disconnected()
649            && self.exec_engine.borrow().check_disconnected()
650    }
651}