nautilus_system/
kernel.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    any::Any,
22    cell::{Ref, RefCell},
23    rc::Rc,
24};
25
26#[cfg(feature = "live")]
27use nautilus_common::live::clock::LiveClock;
28use nautilus_common::{
29    cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
30    clock::{Clock, TestClock},
31    component::Component,
32    enums::Environment,
33    logging::{
34        headers, init_logging, init_tracing,
35        logger::{LogGuard, LoggerConfig},
36        writer::FileWriterConfig,
37    },
38    messages::{DataResponse, data::DataCommand, execution::TradingCommand},
39    msgbus::{
40        self, MessageBus, get_message_bus,
41        handler::{ShareableMessageHandler, TypedMessageHandler},
42        set_message_bus,
43        switchboard::MessagingSwitchboard,
44    },
45    runner::get_data_cmd_sender,
46};
47use nautilus_core::{UUID4, UnixNanos};
48use nautilus_data::engine::DataEngine;
49use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
50use nautilus_model::{events::OrderEventAny, identifiers::TraderId};
51use nautilus_portfolio::portfolio::Portfolio;
52use nautilus_risk::engine::RiskEngine;
53use ustr::Ustr;
54
55use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
56
57/// Core Nautilus system kernel.
58///
59/// Orchestrates data and execution engines, cache, clock, and messaging across environments.
60#[derive(Debug)]
61pub struct NautilusKernel {
62    /// The kernel name (for logging and identification).
63    pub name: String,
64    /// The unique instance identifier for this kernel.
65    pub instance_id: UUID4,
66    /// The machine identifier (hostname or similar).
67    pub machine_id: String,
68    /// The kernel configuration.
69    pub config: Box<dyn NautilusKernelConfig>,
70    /// The shared in-memory cache.
71    pub cache: Rc<RefCell<Cache>>,
72    /// The clock driving the kernel.
73    pub clock: Rc<RefCell<dyn Clock>>,
74    /// The portfolio manager.
75    pub portfolio: Rc<RefCell<Portfolio>>,
76    /// Guard for the logging subsystem (keeps logger thread alive).
77    pub log_guard: LogGuard,
78    /// The data engine instance.
79    pub data_engine: Rc<RefCell<DataEngine>>,
80    /// The risk engine instance.
81    pub risk_engine: Rc<RefCell<RiskEngine>>,
82    /// The execution engine instance.
83    pub exec_engine: Rc<RefCell<ExecutionEngine>>,
84    /// The order emulator for handling emulated orders.
85    pub order_emulator: OrderEmulatorAdapter,
86    /// The trader component.
87    pub trader: Trader,
88    /// The UNIX timestamp (nanoseconds) when the kernel was created.
89    pub ts_created: UnixNanos,
90    /// The UNIX timestamp (nanoseconds) when the kernel was last started.
91    pub ts_started: Option<UnixNanos>,
92    /// The UNIX timestamp (nanoseconds) when the kernel was last shutdown.
93    pub ts_shutdown: Option<UnixNanos>,
94}
95
96impl NautilusKernel {
97    /// Create a new [`NautilusKernelBuilder`] for fluent configuration.
98    #[must_use]
99    pub const fn builder(
100        name: String,
101        trader_id: TraderId,
102        environment: Environment,
103    ) -> NautilusKernelBuilder {
104        NautilusKernelBuilder::new(name, trader_id, environment)
105    }
106
107    /// Create a new [`NautilusKernel`] instance.
108    ///
109    /// # Errors
110    ///
111    /// Returns an error if the kernel fails to initialize.
112    pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
113        let instance_id = config.instance_id().unwrap_or_default();
114        let machine_id = Self::determine_machine_id()?;
115
116        let logger_config = config.logging();
117        let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
118        headers::log_header(
119            config.trader_id(),
120            &machine_id,
121            instance_id,
122            Ustr::from(stringify!(LiveNode)),
123        );
124
125        log::info!("Building system kernel");
126
127        let clock = Self::initialize_clock(&config.environment());
128        let cache = Self::initialize_cache(config.cache());
129
130        let msgbus = Rc::new(RefCell::new(MessageBus::new(
131            config.trader_id(),
132            instance_id,
133            Some(name.clone()),
134            None,
135        )));
136        set_message_bus(msgbus);
137
138        let portfolio = Rc::new(RefCell::new(Portfolio::new(
139            cache.clone(),
140            clock.clone(),
141            config.portfolio(),
142        )));
143
144        let risk_engine = RiskEngine::new(
145            config.risk_engine().unwrap_or_default(),
146            Portfolio::new(cache.clone(), clock.clone(), config.portfolio()),
147            clock.clone(),
148            cache.clone(),
149        );
150        let risk_engine = Rc::new(RefCell::new(risk_engine));
151
152        let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
153        let exec_engine = Rc::new(RefCell::new(exec_engine));
154
155        // Create order emulator (auto-registers message handlers)
156        let order_emulator = OrderEmulatorAdapter::new(clock.clone(), cache.clone());
157
158        let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
159        let data_engine = Rc::new(RefCell::new(data_engine));
160
161        // Register DataEngine command execution
162        use nautilus_core::WeakCell;
163
164        let data_engine_weak = WeakCell::from(Rc::downgrade(&data_engine));
165        let data_engine_weak_clone1 = data_engine_weak.clone();
166        let endpoint = MessagingSwitchboard::data_engine_execute();
167        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
168            move |cmd: &DataCommand| {
169                if let Some(engine_rc) = data_engine_weak_clone1.upgrade() {
170                    engine_rc.borrow_mut().execute(cmd);
171                }
172            },
173        )));
174        msgbus::register(endpoint, handler);
175
176        // Register DataEngine command queueing
177        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
178        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
179            move |cmd: &DataCommand| {
180                get_data_cmd_sender().clone().execute(cmd.clone());
181            },
182        )));
183        msgbus::register(endpoint, handler);
184
185        // Register DataEngine process handler
186        let endpoint = MessagingSwitchboard::data_engine_process();
187        let data_engine_weak2 = data_engine_weak.clone();
188        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
189            move |data: &dyn Any| {
190                if let Some(engine_rc) = data_engine_weak2.upgrade() {
191                    engine_rc.borrow_mut().process(data);
192                }
193            },
194        )));
195        msgbus::register(endpoint, handler);
196
197        // Register DataEngine response handler
198        let endpoint = MessagingSwitchboard::data_engine_response();
199        let data_engine_weak3 = data_engine_weak;
200        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
201            move |resp: &DataResponse| {
202                if let Some(engine_rc) = data_engine_weak3.upgrade() {
203                    engine_rc.borrow_mut().response(resp.clone());
204                }
205            },
206        )));
207        msgbus::register(endpoint, handler);
208
209        // Register RiskEngine execute handler
210        let risk_engine_weak = WeakCell::from(Rc::downgrade(&risk_engine));
211        let endpoint = MessagingSwitchboard::risk_engine_execute();
212        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
213            move |cmd: &TradingCommand| {
214                if let Some(engine_rc) = risk_engine_weak.upgrade() {
215                    engine_rc.borrow_mut().execute(cmd.clone());
216                }
217            },
218        )));
219        msgbus::register(endpoint, handler);
220
221        // Register ExecEngine execute handler
222        let exec_engine_weak = WeakCell::from(Rc::downgrade(&exec_engine));
223        let exec_engine_weak_clone = exec_engine_weak.clone();
224        let endpoint = MessagingSwitchboard::exec_engine_execute();
225        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
226            move |cmd: &TradingCommand| {
227                if let Some(engine_rc) = exec_engine_weak.upgrade() {
228                    engine_rc.borrow().execute(cmd);
229                }
230            },
231        )));
232        msgbus::register(endpoint, handler);
233
234        // Register ExecEngine process handler
235        let endpoint = MessagingSwitchboard::exec_engine_process();
236        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
237            move |event: &OrderEventAny| {
238                if let Some(engine_rc) = exec_engine_weak_clone.upgrade() {
239                    engine_rc.borrow_mut().process(event);
240                } else {
241                    log::error!(
242                        "ExecEngine dropped, cannot process order event: {:?}",
243                        event.client_order_id()
244                    );
245                }
246            },
247        )));
248        msgbus::register(endpoint, handler);
249
250        // TODO: Implement actual reconciliation logic in ExecEngine
251        let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
252        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
253            move |report: &dyn Any| {
254                log::debug!(
255                    "Received execution report for reconciliation: {:?}",
256                    report.type_id()
257                );
258            },
259        )));
260        msgbus::register(endpoint, handler);
261
262        let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_mass_status();
263        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
264            move |report: &dyn Any| {
265                log::debug!(
266                    "Received execution mass status for reconciliation: {:?}",
267                    report.type_id()
268                );
269            },
270        )));
271        msgbus::register(endpoint, handler);
272
273        let trader = Trader::new(
274            config.trader_id(),
275            instance_id,
276            config.environment(),
277            clock.clone(),
278            cache.clone(),
279            portfolio.clone(),
280        );
281
282        let ts_created = clock.borrow().timestamp_ns();
283
284        Ok(Self {
285            name,
286            instance_id,
287            machine_id,
288            config: Box::new(config),
289            cache,
290            clock,
291            portfolio,
292            log_guard,
293            data_engine,
294            risk_engine,
295            exec_engine,
296            order_emulator,
297            trader,
298            ts_created,
299            ts_started: None,
300            ts_shutdown: None,
301        })
302    }
303
304    fn determine_machine_id() -> anyhow::Result<String> {
305        Ok(hostname::get()?.to_string_lossy().into_owned())
306    }
307
308    fn initialize_logging(
309        trader_id: TraderId,
310        instance_id: UUID4,
311        config: LoggerConfig,
312    ) -> anyhow::Result<LogGuard> {
313        let log_guard = init_logging(
314            trader_id,
315            instance_id,
316            config,
317            FileWriterConfig::default(), // TODO: Properly incorporate file writer config
318        )?;
319
320        init_tracing()?;
321
322        Ok(log_guard)
323    }
324
325    fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
326        match environment {
327            Environment::Backtest => {
328                let test_clock = TestClock::new();
329                Rc::new(RefCell::new(test_clock))
330            }
331            #[cfg(feature = "live")]
332            Environment::Live | Environment::Sandbox => {
333                let live_clock = LiveClock::default();
334                Rc::new(RefCell::new(live_clock))
335            }
336            #[cfg(not(feature = "live"))]
337            Environment::Live | Environment::Sandbox => {
338                panic!(
339                    "Live/Sandbox environment requires the 'live' feature to be enabled. \
340                     Build with `--features live` or add `features = [\"live\"]` to your dependency."
341                );
342            }
343        }
344    }
345
346    fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
347        let cache_config = cache_config.unwrap_or_default();
348
349        // TODO: Placeholder: persistent database adapter can be initialized here (e.g., Redis)
350        let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
351        let cache = Cache::new(Some(cache_config), cache_database);
352
353        Rc::new(RefCell::new(cache))
354    }
355
356    fn cancel_timers(&self) {
357        self.clock.borrow_mut().cancel_timers();
358    }
359
360    #[must_use]
361    pub fn generate_timestamp_ns(&self) -> UnixNanos {
362        self.clock.borrow().timestamp_ns()
363    }
364
365    /// Returns the kernel's environment context (Backtest, Sandbox, Live).
366    #[must_use]
367    pub fn environment(&self) -> Environment {
368        self.config.environment()
369    }
370
371    /// Returns the kernel's name.
372    #[must_use]
373    pub const fn name(&self) -> &str {
374        self.name.as_str()
375    }
376
377    /// Returns the kernel's trader ID.
378    #[must_use]
379    pub fn trader_id(&self) -> TraderId {
380        self.config.trader_id()
381    }
382
383    /// Returns the kernel's machine ID.
384    #[must_use]
385    pub fn machine_id(&self) -> &str {
386        &self.machine_id
387    }
388
389    /// Returns the kernel's instance ID.
390    #[must_use]
391    pub const fn instance_id(&self) -> UUID4 {
392        self.instance_id
393    }
394
395    /// Returns the UNIX timestamp (ns) when the kernel was created.
396    #[must_use]
397    pub const fn ts_created(&self) -> UnixNanos {
398        self.ts_created
399    }
400
401    /// Returns the UNIX timestamp (ns) when the kernel was last started.
402    #[must_use]
403    pub const fn ts_started(&self) -> Option<UnixNanos> {
404        self.ts_started
405    }
406
407    /// Returns the UNIX timestamp (ns) when the kernel was last shutdown.
408    #[must_use]
409    pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
410        self.ts_shutdown
411    }
412
413    /// Returns whether the kernel has been configured to load state.
414    #[must_use]
415    pub fn load_state(&self) -> bool {
416        self.config.load_state()
417    }
418
419    /// Returns whether the kernel has been configured to save state.
420    #[must_use]
421    pub fn save_state(&self) -> bool {
422        self.config.save_state()
423    }
424
425    /// Returns the kernel's clock.
426    #[must_use]
427    pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
428        self.clock.clone()
429    }
430
431    /// Returns the kernel's cache.
432    #[must_use]
433    pub fn cache(&self) -> Rc<RefCell<Cache>> {
434        self.cache.clone()
435    }
436
437    /// Returns the kernel's message bus.  // TODO: TBD if this is necessary
438    #[must_use]
439    pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
440        get_message_bus()
441    }
442
443    /// Returns the kernel's portfolio.
444    #[must_use]
445    pub fn portfolio(&self) -> Ref<'_, Portfolio> {
446        self.portfolio.borrow()
447    }
448
449    /// Returns the kernel's data engine.
450    #[must_use]
451    pub fn data_engine(&self) -> Ref<'_, DataEngine> {
452        self.data_engine.borrow()
453    }
454
455    /// Returns the kernel's risk engine.
456    #[must_use]
457    pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
458        &self.risk_engine
459    }
460
461    /// Returns the kernel's execution engine.
462    #[must_use]
463    pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
464        &self.exec_engine
465    }
466
467    /// Returns the kernel's trader.
468    #[must_use]
469    pub const fn trader(&self) -> &Trader {
470        &self.trader
471    }
472
473    /// Starts the Nautilus system kernel.
474    pub async fn start_async(&mut self) {
475        log::info!("Starting");
476        self.start_engines();
477
478        log::info!("Initializing trader");
479        if let Err(e) = self.trader.initialize() {
480            log::error!("Error initializing trader: {e:?}");
481            return;
482        }
483
484        log::info!("Starting clients...");
485        if let Err(e) = self.start_clients() {
486            log::error!("Error starting clients: {e:?}");
487        }
488        log::info!("Clients started");
489
490        if let Err(e) = self.trader.start() {
491            log::error!("Error starting trader: {e:?}");
492        }
493
494        self.ts_started = Some(self.clock.borrow().timestamp_ns());
495        log::info!("Started");
496    }
497
498    /// Stops the Nautilus system kernel.
499    pub async fn stop_async(&mut self) {
500        log::info!("Stopping");
501
502        // Stop the trader (it will stop all registered components)
503        if let Err(e) = self.trader.stop() {
504            log::error!("Error stopping trader: {e:?}");
505        }
506
507        // Wait for residual events to be processed (e.g., cancel orders on stop)
508        #[cfg(feature = "live")]
509        {
510            let delay = self.config.delay_post_stop();
511            log::info!("Awaiting residual events ({delay:?})...");
512            std::thread::sleep(delay);
513        }
514
515        // Stop all adapter clients
516        if let Err(e) = self.stop_all_clients() {
517            log::error!("Error stopping clients: {e:?}");
518        }
519
520        self.stop_engines();
521        self.cancel_timers();
522
523        self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
524        log::info!("Stopped");
525    }
526
527    /// Resets the Nautilus system kernel to its initial state.
528    pub fn reset(&mut self) {
529        log::info!("Resetting");
530
531        if let Err(e) = self.trader.reset() {
532            log::error!("Error resetting trader: {e:?}");
533        }
534
535        // Reset engines
536        self.data_engine.borrow_mut().reset();
537        // TODO: Reset other engines when reset methods are available
538
539        self.ts_started = None;
540        self.ts_shutdown = None;
541
542        log::info!("Reset");
543    }
544
545    /// Disposes of the Nautilus system kernel, releasing resources.
546    pub fn dispose(&mut self) {
547        log::info!("Disposing");
548
549        if let Err(e) = self.trader.dispose() {
550            log::error!("Error disposing trader: {e:?}");
551        }
552
553        self.stop_engines();
554
555        self.data_engine.borrow_mut().dispose();
556        // TODO: Implement dispose methods for other engines
557
558        log::info!("Disposed");
559    }
560
561    /// Cancels all tasks currently running under the kernel.
562    ///
563    /// Intended for cleanup during shutdown.
564    const fn cancel_all_tasks(&self) {
565        // TODO: implement task cancellation logic for async contexts
566    }
567
568    /// Starts all engine components.
569    fn start_engines(&self) {
570        self.data_engine.borrow_mut().start();
571        // TODO: Start other engines when methods are available
572    }
573
574    /// Stops all engine components.
575    fn stop_engines(&self) {
576        self.data_engine.borrow_mut().stop();
577        // TODO: Stop other engines when methods are available
578    }
579
580    /// Starts all engine clients.
581    ///
582    /// Note: Async connection (connect/disconnect) is handled by LiveNode for live clients.
583    /// This method only handles synchronous start operations on execution clients.
584    fn start_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
585        let mut errors = Vec::new();
586
587        {
588            let mut exec_engine = self.exec_engine.borrow_mut();
589            let exec_adapters = exec_engine.get_clients_mut();
590
591            for adapter in exec_adapters {
592                if let Err(e) = adapter.start() {
593                    log::error!("Error starting execution client {}: {e}", adapter.client_id);
594                    errors.push(e);
595                }
596            }
597        }
598
599        if errors.is_empty() {
600            Ok(())
601        } else {
602            Err(errors)
603        }
604    }
605
606    /// Stops all engine clients.
607    ///
608    /// Note: Async disconnection is handled by LiveNode for live clients.
609    /// This method only handles synchronous stop operations on execution clients.
610    fn stop_all_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
611        let mut errors = Vec::new();
612
613        {
614            let mut exec_engine = self.exec_engine.borrow_mut();
615            let exec_adapters = exec_engine.get_clients_mut();
616
617            for adapter in exec_adapters {
618                if let Err(e) = adapter.stop() {
619                    log::error!("Error stopping execution client {}: {e}", adapter.client_id);
620                    errors.push(e);
621                }
622            }
623        }
624
625        if errors.is_empty() {
626            Ok(())
627        } else {
628            Err(errors)
629        }
630    }
631
632    /// Stops engine clients.
633    fn stop_clients(&self) {
634        self.data_engine.borrow_mut().stop();
635    }
636
637    /// Connects all engine clients.
638    ///
639    /// # Errors
640    ///
641    /// Returns an error if any client fails to connect.
642    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
643    pub async fn connect_clients(&mut self) -> anyhow::Result<()> {
644        log::info!("Connecting clients...");
645        self.data_engine.borrow_mut().connect().await?;
646        self.exec_engine.borrow_mut().connect().await?;
647        Ok(())
648    }
649
650    /// Disconnects all engine clients.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if any client fails to disconnect.
655    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
656    pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
657        log::info!("Disconnecting clients...");
658        self.data_engine.borrow_mut().disconnect().await?;
659        self.exec_engine.borrow_mut().disconnect().await?;
660        Ok(())
661    }
662
663    /// Initializes the portfolio (orders & positions).
664    const fn initialize_portfolio(&self) {
665        // TODO: Placeholder: portfolio initialization to be implemented in next pass
666    }
667
668    /// Awaits execution engine state reconciliation.
669    ///
670    /// Blocks until executions are reconciled or timeout.
671    const fn await_execution_reconciliation(&self) {
672        // TODO: await execution reconciliation with timeout
673    }
674
675    /// Awaits portfolio initialization.
676    ///
677    /// Blocks until portfolio is initialized or timeout.
678    const fn await_portfolio_initialized(&self) {
679        // TODO: await portfolio initialization with timeout
680    }
681
682    /// Awaits post-stop trader residual events.
683    ///
684    /// Allows final cleanup before full shutdown.
685    const fn await_trader_residuals(&self) {
686        // TODO: await trader residual events after stop
687    }
688
689    /// Returns `true` if all engine clients are connected.
690    #[must_use]
691    pub fn check_engines_connected(&self) -> bool {
692        self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
693    }
694
695    /// Returns `true` if all engine clients are disconnected.
696    #[must_use]
697    pub fn check_engines_disconnected(&self) -> bool {
698        self.data_engine.borrow().check_disconnected()
699            && self.exec_engine.borrow().check_disconnected()
700    }
701
702    /// Checks if the portfolio has been initialized.
703    const fn check_portfolio_initialized(&self) {
704        // TODO: check portfolio initialized status
705    }
706
707    /// Flushes the stream writer.
708    const fn flush_writer(&self) {
709        // TODO: No writer in this kernel version; placeholder for future streaming
710    }
711}