Skip to main content

nautilus_system/
kernel.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::{Ref, RefCell},
18    rc::Rc,
19    time::Duration,
20};
21
22use nautilus_common::{
23    cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
24    clock::{Clock, TestClock},
25    component::Component,
26    enums::Environment,
27    logging::{
28        headers, init_logging,
29        logger::{LogGuard, LoggerConfig},
30        writer::FileWriterConfig,
31    },
32    msgbus::{MessageBus, set_message_bus},
33};
34use nautilus_core::{UUID4, UnixNanos};
35use nautilus_data::engine::DataEngine;
36use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
37use nautilus_model::identifiers::{ClientId, TraderId};
38use nautilus_portfolio::portfolio::Portfolio;
39use nautilus_risk::engine::RiskEngine;
40use ustr::Ustr;
41
42use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
43
44/// Core Nautilus system kernel.
45///
46/// Orchestrates data and execution engines, cache, clock, and messaging across environments.
47#[derive(Debug)]
48pub struct NautilusKernel {
49    /// The kernel name (for logging and identification).
50    pub name: String,
51    /// The unique instance identifier for this kernel.
52    pub instance_id: UUID4,
53    /// The machine identifier (hostname or similar).
54    pub machine_id: String,
55    /// The kernel configuration.
56    pub config: Box<dyn NautilusKernelConfig>,
57    /// The shared in-memory cache.
58    pub cache: Rc<RefCell<Cache>>,
59    /// The clock driving the kernel.
60    pub clock: Rc<RefCell<dyn Clock>>,
61    /// The portfolio manager.
62    pub portfolio: Rc<RefCell<Portfolio>>,
63    /// Guard for the logging subsystem (keeps logger thread alive).
64    pub log_guard: LogGuard,
65    /// The data engine instance.
66    pub data_engine: Rc<RefCell<DataEngine>>,
67    /// The risk engine instance.
68    pub risk_engine: Rc<RefCell<RiskEngine>>,
69    /// The execution engine instance.
70    pub exec_engine: Rc<RefCell<ExecutionEngine>>,
71    /// The order emulator for handling emulated orders.
72    pub order_emulator: OrderEmulatorAdapter,
73    /// The trader component.
74    pub trader: Trader,
75    /// The UNIX timestamp (nanoseconds) when the kernel was created.
76    pub ts_created: UnixNanos,
77    /// The UNIX timestamp (nanoseconds) when the kernel was last started.
78    pub ts_started: Option<UnixNanos>,
79    /// The UNIX timestamp (nanoseconds) when the kernel was last shutdown.
80    pub ts_shutdown: Option<UnixNanos>,
81}
82
83impl NautilusKernel {
84    /// Create a new [`NautilusKernelBuilder`] for fluent configuration.
85    #[must_use]
86    pub const fn builder(
87        name: String,
88        trader_id: TraderId,
89        environment: Environment,
90    ) -> NautilusKernelBuilder {
91        NautilusKernelBuilder::new(name, trader_id, environment)
92    }
93
94    /// Create a new [`NautilusKernel`] instance.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the kernel fails to initialize.
99    pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
100        let instance_id = config.instance_id().unwrap_or_default();
101        let machine_id = Self::determine_machine_id()?;
102
103        let logger_config = config.logging();
104        let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
105        headers::log_header(
106            config.trader_id(),
107            &machine_id,
108            instance_id,
109            Ustr::from(stringify!(LiveNode)),
110        );
111
112        log::info!("Building system kernel");
113
114        let clock = Self::initialize_clock(&config.environment());
115        let cache = Self::initialize_cache(config.cache());
116
117        let msgbus = Rc::new(RefCell::new(MessageBus::new(
118            config.trader_id(),
119            instance_id,
120            Some(name.clone()),
121            None,
122        )));
123        set_message_bus(msgbus);
124
125        let portfolio = Rc::new(RefCell::new(Portfolio::new(
126            cache.clone(),
127            clock.clone(),
128            config.portfolio(),
129        )));
130
131        let risk_engine = RiskEngine::new(
132            config.risk_engine().unwrap_or_default(),
133            portfolio.borrow().clone_shallow(),
134            clock.clone(),
135            cache.clone(),
136        );
137        let risk_engine = Rc::new(RefCell::new(risk_engine));
138
139        let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
140        let exec_engine = Rc::new(RefCell::new(exec_engine));
141
142        let order_emulator =
143            OrderEmulatorAdapter::new(config.trader_id(), clock.clone(), cache.clone());
144
145        let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
146        let data_engine = Rc::new(RefCell::new(data_engine));
147
148        DataEngine::register_msgbus_handlers(data_engine.clone());
149        RiskEngine::register_msgbus_handlers(risk_engine.clone());
150        ExecutionEngine::register_msgbus_handlers(exec_engine.clone());
151
152        let trader = Trader::new(
153            config.trader_id(),
154            instance_id,
155            config.environment(),
156            clock.clone(),
157            cache.clone(),
158            portfolio.clone(),
159        );
160
161        let ts_created = clock.borrow().timestamp_ns();
162
163        Ok(Self {
164            name,
165            instance_id,
166            machine_id,
167            config: Box::new(config),
168            cache,
169            clock,
170            portfolio,
171            log_guard,
172            data_engine,
173            risk_engine,
174            exec_engine,
175            order_emulator,
176            trader,
177            ts_created,
178            ts_started: None,
179            ts_shutdown: None,
180        })
181    }
182
183    fn determine_machine_id() -> anyhow::Result<String> {
184        sysinfo::System::host_name().ok_or_else(|| anyhow::anyhow!("Failed to determine hostname"))
185    }
186
187    fn initialize_logging(
188        trader_id: TraderId,
189        instance_id: UUID4,
190        config: LoggerConfig,
191    ) -> anyhow::Result<LogGuard> {
192        #[cfg(feature = "tracing-bridge")]
193        let use_tracing = config.use_tracing;
194
195        let log_guard = match init_logging(
196            trader_id,
197            instance_id,
198            config,
199            FileWriterConfig::default(), // TODO: Properly incorporate file writer config
200        ) {
201            Ok(guard) => guard,
202            Err(e) => {
203                // Only recover from SetLoggerError (logger already registered).
204                // This is common in tests where multiple kernels are created and
205                // the log crate's global logger persists after LogGuard teardown.
206                // Any other error (e.g. thread spawn failure) is propagated.
207                if e.downcast_ref::<log::SetLoggerError>().is_some() {
208                    if let Some(guard) = LogGuard::new() {
209                        guard
210                    } else {
211                        return Err(e.context(
212                            "A non-Nautilus logger is already registered; \
213                             cannot initialize Nautilus logging",
214                        ));
215                    }
216                } else {
217                    return Err(e);
218                }
219            }
220        };
221
222        // Initialize tracing subscriber if enabled (idempotent)
223        #[cfg(feature = "tracing-bridge")]
224        if use_tracing && !nautilus_common::logging::bridge::tracing_is_initialized() {
225            nautilus_common::logging::bridge::init_tracing()?;
226        }
227
228        Ok(log_guard)
229    }
230
231    fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
232        match environment {
233            Environment::Backtest => {
234                let test_clock = TestClock::new();
235                Rc::new(RefCell::new(test_clock))
236            }
237            #[cfg(feature = "live")]
238            Environment::Live | Environment::Sandbox => {
239                let live_clock = nautilus_common::live::clock::LiveClock::default(); // nautilus-import-ok
240                Rc::new(RefCell::new(live_clock))
241            }
242            #[cfg(not(feature = "live"))]
243            Environment::Live | Environment::Sandbox => {
244                panic!(
245                    "Live/Sandbox environment requires the 'live' feature to be enabled. \
246                     Build with `--features live` or add `features = [\"live\"]` to your dependency."
247                );
248            }
249        }
250    }
251
252    fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
253        let cache_config = cache_config.unwrap_or_default();
254
255        // TODO: Placeholder: persistent database adapter can be initialized here (e.g., Redis)
256        let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
257        let cache = Cache::new(Some(cache_config), cache_database);
258
259        Rc::new(RefCell::new(cache))
260    }
261
262    fn cancel_timers(&self) {
263        self.clock.borrow_mut().cancel_timers();
264    }
265
266    #[must_use]
267    pub fn generate_timestamp_ns(&self) -> UnixNanos {
268        self.clock.borrow().timestamp_ns()
269    }
270
271    /// Returns the kernel's environment context (Backtest, Sandbox, Live).
272    #[must_use]
273    pub fn environment(&self) -> Environment {
274        self.config.environment()
275    }
276
277    /// Returns the kernel's name.
278    #[must_use]
279    pub const fn name(&self) -> &str {
280        self.name.as_str()
281    }
282
283    /// Returns the kernel's trader ID.
284    #[must_use]
285    pub fn trader_id(&self) -> TraderId {
286        self.config.trader_id()
287    }
288
289    /// Returns the kernel's machine ID.
290    #[must_use]
291    pub fn machine_id(&self) -> &str {
292        &self.machine_id
293    }
294
295    /// Returns the kernel's instance ID.
296    #[must_use]
297    pub const fn instance_id(&self) -> UUID4 {
298        self.instance_id
299    }
300
301    /// Returns the delay after stopping the node to await residual events before final shutdown.
302    #[must_use]
303    pub fn delay_post_stop(&self) -> Duration {
304        self.config.delay_post_stop()
305    }
306
307    /// Returns the UNIX timestamp (ns) when the kernel was created.
308    #[must_use]
309    pub const fn ts_created(&self) -> UnixNanos {
310        self.ts_created
311    }
312
313    /// Returns the UNIX timestamp (ns) when the kernel was last started.
314    #[must_use]
315    pub const fn ts_started(&self) -> Option<UnixNanos> {
316        self.ts_started
317    }
318
319    /// Returns the UNIX timestamp (ns) when the kernel was last shutdown.
320    #[must_use]
321    pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
322        self.ts_shutdown
323    }
324
325    /// Returns whether the kernel has been configured to load state.
326    #[must_use]
327    pub fn load_state(&self) -> bool {
328        self.config.load_state()
329    }
330
331    /// Returns whether the kernel has been configured to save state.
332    #[must_use]
333    pub fn save_state(&self) -> bool {
334        self.config.save_state()
335    }
336
337    /// Returns the kernel's clock.
338    #[must_use]
339    pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
340        self.clock.clone()
341    }
342
343    /// Returns the kernel's cache.
344    #[must_use]
345    pub fn cache(&self) -> Rc<RefCell<Cache>> {
346        self.cache.clone()
347    }
348
349    /// Returns the kernel's portfolio.
350    #[must_use]
351    pub fn portfolio(&self) -> Ref<'_, Portfolio> {
352        self.portfolio.borrow()
353    }
354
355    /// Returns the kernel's data engine.
356    #[must_use]
357    pub fn data_engine(&self) -> Ref<'_, DataEngine> {
358        self.data_engine.borrow()
359    }
360
361    /// Returns the kernel's risk engine.
362    #[must_use]
363    pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
364        &self.risk_engine
365    }
366
367    /// Returns the kernel's execution engine.
368    #[must_use]
369    pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
370        &self.exec_engine
371    }
372
373    /// Returns the kernel's trader.
374    #[must_use]
375    pub const fn trader(&self) -> &Trader {
376        &self.trader
377    }
378
379    /// Starts the Nautilus system kernel synchronously (for backtest use).
380    pub fn start(&mut self) {
381        log::info!("Starting");
382        self.start_engines();
383
384        log::info!("Initializing trader");
385        if let Err(e) = self.trader.initialize() {
386            log::error!("Error initializing trader: {e:?}");
387            return;
388        }
389
390        log::info!("Starting clients...");
391        if let Err(e) = self.start_clients() {
392            log::error!("Error starting clients: {e:?}");
393        }
394        log::info!("Clients started");
395
396        self.ts_started = Some(self.clock.borrow().timestamp_ns());
397        log::info!("Started");
398    }
399
400    /// Starts the Nautilus system kernel asynchronously.
401    pub async fn start_async(&mut self) {
402        self.start();
403    }
404
405    /// Starts the trader (strategies and actors).
406    ///
407    /// This should be called after clients are connected and instruments are cached.
408    pub fn start_trader(&mut self) {
409        log::info!("Starting trader...");
410        if let Err(e) = self.trader.start() {
411            log::error!("Error starting trader: {e:?}");
412        }
413        log::info!("Trader started");
414    }
415
416    /// Stops the trader and its registered components.
417    ///
418    /// This method initiates a graceful shutdown of trading components (strategies, actors)
419    /// which may trigger residual events such as order cancellations. The caller should
420    /// continue processing events after calling this method to handle these residual events.
421    pub fn stop_trader(&mut self) {
422        if !self.trader.is_running() {
423            return;
424        }
425
426        log::info!("Stopping trader...");
427
428        if let Err(e) = self.trader.stop() {
429            log::error!("Error stopping trader: {e}");
430        }
431    }
432
433    /// Finalizes the kernel shutdown after the grace period.
434    ///
435    /// This method should be called after the residual events grace period has elapsed
436    /// and all remaining events have been processed. It disconnects clients and stops engines.
437    pub async fn finalize_stop(&mut self) {
438        // Stop all adapter clients
439        if let Err(e) = self.stop_all_clients() {
440            log::error!("Error stopping clients: {e:?}");
441        }
442
443        self.stop_engines();
444        self.cancel_timers();
445
446        self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
447        log::info!("Stopped");
448    }
449
450    /// Resets the Nautilus system kernel to its initial state.
451    pub fn reset(&mut self) {
452        log::info!("Resetting");
453
454        if let Err(e) = self.trader.reset() {
455            log::error!("Error resetting trader: {e:?}");
456        }
457
458        self.data_engine.borrow_mut().reset();
459        self.exec_engine.borrow_mut().reset();
460        self.risk_engine.borrow_mut().reset();
461
462        self.ts_started = None;
463        self.ts_shutdown = None;
464
465        log::info!("Reset");
466    }
467
468    /// Disposes of the Nautilus system kernel, releasing resources.
469    pub fn dispose(&mut self) {
470        log::info!("Disposing");
471
472        if let Err(e) = self.trader.dispose() {
473            log::error!("Error disposing trader: {e:?}");
474        }
475
476        self.stop_engines();
477
478        self.data_engine.borrow_mut().dispose();
479        self.exec_engine.borrow_mut().dispose();
480        self.risk_engine.borrow_mut().dispose();
481
482        log::info!("Disposed");
483    }
484
485    /// Starts all engine components.
486    fn start_engines(&self) {
487        self.data_engine.borrow_mut().start();
488        self.exec_engine.borrow_mut().start();
489        self.risk_engine.borrow_mut().start();
490    }
491
492    /// Stops all engine components.
493    fn stop_engines(&self) {
494        self.data_engine.borrow_mut().stop();
495        self.exec_engine.borrow_mut().stop();
496        self.risk_engine.borrow_mut().stop();
497    }
498
499    /// Starts all engine clients.
500    ///
501    /// Note: Async connection (connect/disconnect) is handled by LiveNode for live clients.
502    /// This method only handles synchronous start operations on execution clients.
503    fn start_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
504        let mut errors = Vec::new();
505
506        {
507            let mut exec_engine = self.exec_engine.borrow_mut();
508            let exec_adapters = exec_engine.get_clients_mut();
509
510            for adapter in exec_adapters {
511                if let Err(e) = adapter.start() {
512                    log::error!("Error starting execution client {}: {e}", adapter.client_id);
513                    errors.push(e);
514                }
515            }
516        }
517
518        if errors.is_empty() {
519            Ok(())
520        } else {
521            Err(errors)
522        }
523    }
524
525    /// Stops all engine clients.
526    ///
527    /// Note: Async disconnection is handled by LiveNode for live clients.
528    /// This method only handles synchronous stop operations on execution clients.
529    fn stop_all_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
530        let mut errors = Vec::new();
531
532        {
533            let mut exec_engine = self.exec_engine.borrow_mut();
534            let exec_adapters = exec_engine.get_clients_mut();
535
536            for adapter in exec_adapters {
537                if let Err(e) = adapter.stop() {
538                    log::error!("Error stopping execution client {}: {e}", adapter.client_id);
539                    errors.push(e);
540                }
541            }
542        }
543
544        if errors.is_empty() {
545            Ok(())
546        } else {
547            Err(errors)
548        }
549    }
550
551    /// Connects all engine clients.
552    ///
553    /// Connection failures are logged but do not prevent the node from running.
554    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
555    pub async fn connect_clients(&mut self) {
556        log::info!("Connecting clients...");
557        self.data_engine.borrow_mut().connect().await;
558        self.exec_engine.borrow_mut().connect().await;
559    }
560
561    /// Disconnects all engine clients.
562    ///
563    /// # Errors
564    ///
565    /// Returns an error if any client fails to disconnect.
566    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
567    pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
568        log::info!("Disconnecting clients...");
569        self.data_engine.borrow_mut().disconnect().await?;
570        self.exec_engine.borrow_mut().disconnect().await?;
571        Ok(())
572    }
573
574    /// Returns `true` if all engine clients are connected.
575    #[must_use]
576    pub fn check_engines_connected(&self) -> bool {
577        self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
578    }
579
580    /// Returns `true` if all engine clients are disconnected.
581    #[must_use]
582    pub fn check_engines_disconnected(&self) -> bool {
583        self.data_engine.borrow().check_disconnected()
584            && self.exec_engine.borrow().check_disconnected()
585    }
586
587    /// Returns connection status for all data clients.
588    #[must_use]
589    pub fn data_client_connection_status(&self) -> Vec<(ClientId, bool)> {
590        self.data_engine.borrow().client_connection_status()
591    }
592
593    /// Returns connection status for all execution clients.
594    #[must_use]
595    pub fn exec_client_connection_status(&self) -> Vec<(ClientId, bool)> {
596        self.exec_engine.borrow().client_connection_status()
597    }
598}