nautilus_system/
kernel.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    any::Any,
22    cell::{Ref, RefCell},
23    rc::Rc,
24};
25
26use futures::future::join_all;
27use nautilus_common::{
28    cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
29    clock::{Clock, LiveClock, TestClock},
30    component::Component,
31    enums::Environment,
32    logging::{
33        headers, init_logging, init_tracing,
34        logger::{LogGuard, LoggerConfig},
35        writer::FileWriterConfig,
36    },
37    messages::{DataResponse, data::DataCommand},
38    msgbus::{
39        self, MessageBus, get_message_bus,
40        handler::{ShareableMessageHandler, TypedMessageHandler},
41        set_message_bus,
42        switchboard::MessagingSwitchboard,
43    },
44    runner::get_data_cmd_sender,
45};
46use nautilus_core::{UUID4, UnixNanos};
47use nautilus_data::engine::DataEngine;
48use nautilus_execution::engine::ExecutionEngine;
49use nautilus_model::identifiers::TraderId;
50use nautilus_portfolio::portfolio::Portfolio;
51use nautilus_risk::engine::RiskEngine;
52use ustr::Ustr;
53
54use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
55
56/// Core Nautilus system kernel.
57///
58/// Orchestrates data and execution engines, cache, clock, and messaging across environments.
59#[derive(Debug)]
60pub struct NautilusKernel {
61    /// The kernel name (for logging and identification).
62    pub name: String,
63    /// The unique instance identifier for this kernel.
64    pub instance_id: UUID4,
65    /// The machine identifier (hostname or similar).
66    pub machine_id: String,
67    /// The kernel configuration.
68    pub config: Box<dyn NautilusKernelConfig>,
69    /// The shared in-memory cache.
70    pub cache: Rc<RefCell<Cache>>,
71    /// The clock driving the kernel.
72    pub clock: Rc<RefCell<dyn Clock>>,
73    /// The portfolio manager.
74    pub portfolio: Portfolio,
75    /// Guard for the logging subsystem (keeps logger thread alive).
76    pub log_guard: LogGuard,
77    /// The data engine instance.
78    pub data_engine: Rc<RefCell<DataEngine>>,
79    /// The risk engine instance.
80    pub risk_engine: RiskEngine,
81    /// The execution engine instance.
82    pub exec_engine: ExecutionEngine,
83    /// The trader component.
84    pub trader: Trader,
85    /// The UNIX timestamp (nanoseconds) when the kernel was created.
86    pub ts_created: UnixNanos,
87    /// The UNIX timestamp (nanoseconds) when the kernel was last started.
88    pub ts_started: Option<UnixNanos>,
89    /// The UNIX timestamp (nanoseconds) when the kernel was last shutdown.
90    pub ts_shutdown: Option<UnixNanos>,
91}
92
93impl NautilusKernel {
94    /// Create a new [`NautilusKernelBuilder`] for fluent configuration.
95    #[must_use]
96    pub const fn builder(
97        name: String,
98        trader_id: TraderId,
99        environment: nautilus_common::enums::Environment,
100    ) -> NautilusKernelBuilder {
101        NautilusKernelBuilder::new(name, trader_id, environment)
102    }
103
104    /// Create a new [`NautilusKernel`] instance.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the kernel fails to initialize.
109    pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
110        let instance_id = config.instance_id().unwrap_or_default();
111        let machine_id = Self::determine_machine_id()?;
112
113        let logger_config = config.logging();
114        let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
115        headers::log_header(
116            config.trader_id(),
117            &machine_id,
118            instance_id,
119            Ustr::from(stringify!(LiveNode)),
120        );
121
122        log::info!("Building system kernel");
123
124        let clock = Self::initialize_clock(&config.environment());
125        let cache = Self::initialize_cache(config.cache());
126
127        let msgbus = Rc::new(RefCell::new(MessageBus::new(
128            config.trader_id(),
129            instance_id,
130            Some(name.to_string()),
131            None,
132        )));
133        set_message_bus(msgbus);
134
135        let portfolio = Portfolio::new(cache.clone(), clock.clone(), config.portfolio());
136        let risk_engine = RiskEngine::new(
137            config.risk_engine().unwrap_or_default(),
138            Portfolio::new(cache.clone(), clock.clone(), config.portfolio()),
139            clock.clone(),
140            cache.clone(),
141        );
142        let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
143
144        let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
145        let data_engine = Rc::new(RefCell::new(data_engine));
146
147        // Register DataEngine command execution
148        use nautilus_core::WeakCell;
149
150        let data_engine_weak = WeakCell::from(Rc::downgrade(&data_engine));
151        let data_engine_weak_clone1 = data_engine_weak.clone();
152        let endpoint = MessagingSwitchboard::data_engine_execute();
153        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
154            move |cmd: &DataCommand| {
155                if let Some(engine_rc) = data_engine_weak_clone1.upgrade() {
156                    engine_rc.borrow_mut().execute(cmd);
157                }
158            },
159        )));
160        msgbus::register(endpoint, handler);
161
162        // Register DataEngine command queueing
163        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
164        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
165            move |cmd: &DataCommand| {
166                get_data_cmd_sender().clone().execute(cmd.clone());
167            },
168        )));
169        msgbus::register(endpoint, handler);
170
171        // Register DataEngine process handler
172        let endpoint = MessagingSwitchboard::data_engine_process();
173        let data_engine_weak2 = data_engine_weak.clone();
174        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
175            move |data: &dyn Any| {
176                if let Some(engine_rc) = data_engine_weak2.upgrade() {
177                    engine_rc.borrow_mut().process(data);
178                }
179            },
180        )));
181        msgbus::register(endpoint, handler);
182
183        // Register DataEngine response handler
184        let endpoint = MessagingSwitchboard::data_engine_response();
185        let data_engine_weak3 = data_engine_weak;
186        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
187            move |resp: &DataResponse| {
188                if let Some(engine_rc) = data_engine_weak3.upgrade() {
189                    engine_rc.borrow_mut().response(resp.clone());
190                }
191            },
192        )));
193        msgbus::register(endpoint, handler);
194
195        let trader = Trader::new(
196            config.trader_id(),
197            instance_id,
198            config.environment(),
199            clock.clone(),
200            cache.clone(),
201        );
202
203        let ts_created = clock.borrow().timestamp_ns();
204
205        Ok(Self {
206            name,
207            instance_id,
208            machine_id,
209            config: Box::new(config),
210            cache,
211            clock,
212            portfolio,
213            log_guard,
214            data_engine,
215            risk_engine,
216            exec_engine,
217            trader,
218            ts_created,
219            ts_started: None,
220            ts_shutdown: None,
221        })
222    }
223
224    fn determine_machine_id() -> anyhow::Result<String> {
225        Ok(hostname::get()?.to_string_lossy().into_owned())
226    }
227
228    fn initialize_logging(
229        trader_id: TraderId,
230        instance_id: UUID4,
231        config: LoggerConfig,
232    ) -> anyhow::Result<LogGuard> {
233        init_tracing()?;
234
235        let log_guard = init_logging(
236            trader_id,
237            instance_id,
238            config,
239            FileWriterConfig::default(), // TODO: Properly incorporate file writer config
240        )?;
241
242        Ok(log_guard)
243    }
244
245    fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
246        match environment {
247            Environment::Backtest => {
248                let test_clock = TestClock::new();
249                Rc::new(RefCell::new(test_clock))
250            }
251            Environment::Live | Environment::Sandbox => {
252                let live_clock = LiveClock::default();
253                Rc::new(RefCell::new(live_clock))
254            }
255        }
256    }
257
258    fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
259        let cache_config = cache_config.unwrap_or_default();
260
261        // TODO: Placeholder: persistent database adapter can be initialized here (e.g., Redis)
262        let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
263        let cache = Cache::new(Some(cache_config), cache_database);
264
265        Rc::new(RefCell::new(cache))
266    }
267
268    fn cancel_timers(&self) {
269        self.clock.borrow_mut().cancel_timers();
270    }
271
272    #[must_use]
273    pub fn generate_timestamp_ns(&self) -> UnixNanos {
274        self.clock.borrow().timestamp_ns()
275    }
276
277    /// Returns the kernel's environment context (Backtest, Sandbox, Live).
278    #[must_use]
279    pub fn environment(&self) -> Environment {
280        self.config.environment()
281    }
282
283    /// Returns the kernel's name.
284    #[must_use]
285    pub const fn name(&self) -> &str {
286        self.name.as_str()
287    }
288
289    /// Returns the kernel's trader ID.
290    #[must_use]
291    pub fn trader_id(&self) -> TraderId {
292        self.config.trader_id()
293    }
294
295    /// Returns the kernel's machine ID.
296    #[must_use]
297    pub fn machine_id(&self) -> &str {
298        &self.machine_id
299    }
300
301    /// Returns the kernel's instance ID.
302    #[must_use]
303    pub const fn instance_id(&self) -> UUID4 {
304        self.instance_id
305    }
306
307    /// Returns the UNIX timestamp (ns) when the kernel was created.
308    #[must_use]
309    pub const fn ts_created(&self) -> UnixNanos {
310        self.ts_created
311    }
312
313    /// Returns the UNIX timestamp (ns) when the kernel was last started.
314    #[must_use]
315    pub const fn ts_started(&self) -> Option<UnixNanos> {
316        self.ts_started
317    }
318
319    /// Returns the UNIX timestamp (ns) when the kernel was last shutdown.
320    #[must_use]
321    pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
322        self.ts_shutdown
323    }
324
325    /// Returns whether the kernel has been configured to load state.
326    #[must_use]
327    pub fn load_state(&self) -> bool {
328        self.config.load_state()
329    }
330
331    /// Returns whether the kernel has been configured to save state.
332    #[must_use]
333    pub fn save_state(&self) -> bool {
334        self.config.save_state()
335    }
336
337    /// Returns the kernel's clock.
338    #[must_use]
339    pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
340        self.clock.clone()
341    }
342
343    /// Returns the kernel's cache.
344    #[must_use]
345    pub fn cache(&self) -> Rc<RefCell<Cache>> {
346        self.cache.clone()
347    }
348
349    /// Returns the kernel's message bus.  // TODO: TBD if this is necessary
350    #[must_use]
351    pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
352        get_message_bus()
353    }
354
355    /// Returns the kernel's portfolio.
356    #[must_use]
357    pub const fn portfolio(&self) -> &Portfolio {
358        &self.portfolio
359    }
360
361    /// Returns the kernel's data engine.
362    #[must_use]
363    pub fn data_engine(&self) -> Ref<'_, DataEngine> {
364        self.data_engine.borrow()
365    }
366
367    /// Returns the kernel's risk engine.
368    #[must_use]
369    pub const fn risk_engine(&self) -> &RiskEngine {
370        &self.risk_engine
371    }
372
373    /// Returns the kernel's execution engine.
374    #[must_use]
375    pub const fn exec_engine(&self) -> &ExecutionEngine {
376        &self.exec_engine
377    }
378
379    /// Returns the kernel's trader.
380    #[must_use]
381    pub const fn trader(&self) -> &Trader {
382        &self.trader
383    }
384
385    /// Starts the Nautilus system kernel.
386    pub async fn start_async(&mut self) {
387        log::info!("Starting");
388        self.start_engines();
389
390        log::info!("Initializing trader");
391        if let Err(e) = self.trader.initialize() {
392            log::error!("Error initializing trader: {e:?}");
393            return;
394        }
395
396        log::info!("Connecting clients...");
397        if let Err(e) = self.connect_clients().await {
398            log::error!("Error connecting clients: {e:?}");
399        }
400        log::info!("Clients connected");
401
402        if let Err(e) = self.trader.start() {
403            log::error!("Error starting trader: {e:?}");
404        }
405
406        self.ts_started = Some(self.clock.borrow().timestamp_ns());
407        log::info!("Started");
408    }
409
410    /// Stops the Nautilus system kernel.
411    pub async fn stop_async(&mut self) {
412        log::info!("Stopping");
413
414        // Stop the trader (it will stop all registered components)
415        if let Err(e) = self.trader.stop() {
416            log::error!("Error stopping trader: {e:?}");
417        }
418
419        // Disconnect all adapter clients
420        if let Err(e) = self.disconnect_clients().await {
421            log::error!("Error disconnecting clients: {e:?}");
422        }
423
424        self.stop_engines();
425        self.cancel_timers();
426
427        self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
428        log::info!("Stopped");
429    }
430
431    /// Resets the Nautilus system kernel to its initial state.
432    pub fn reset(&mut self) {
433        log::info!("Resetting");
434
435        if let Err(e) = self.trader.reset() {
436            log::error!("Error resetting trader: {e:?}");
437        }
438
439        // Reset engines
440        self.data_engine.borrow_mut().reset();
441        // TODO: Reset other engines when reset methods are available
442
443        self.ts_started = None;
444        self.ts_shutdown = None;
445
446        log::info!("Reset");
447    }
448
449    /// Disposes of the Nautilus system kernel, releasing resources.
450    pub fn dispose(&mut self) {
451        log::info!("Disposing");
452
453        if let Err(e) = self.trader.dispose() {
454            log::error!("Error disposing trader: {e:?}");
455        }
456
457        self.stop_engines();
458
459        self.data_engine.borrow_mut().dispose();
460        // TODO: Implement dispose methods for other engines
461
462        log::info!("Disposed");
463    }
464
465    /// Cancels all tasks currently running under the kernel.
466    ///
467    /// Intended for cleanup during shutdown.
468    const fn cancel_all_tasks(&self) {
469        // TODO: implement task cancellation logic for async contexts
470    }
471
472    /// Starts all engine components.
473    fn start_engines(&self) {
474        self.data_engine.borrow_mut().start();
475        // TODO: Start other engines when methods are available
476    }
477
478    /// Stops all engine components.
479    fn stop_engines(&self) {
480        self.data_engine.borrow_mut().stop();
481        // TODO: Stop other engines when methods are available
482    }
483
484    /// Connects all engine clients.
485    #[allow(clippy::await_holding_refcell_ref)]
486    async fn connect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
487        let mut data_engine = self.data_engine.borrow_mut();
488        let mut data_adapters = data_engine.get_clients_mut();
489        let mut futures = Vec::with_capacity(data_adapters.len());
490
491        for adapter in &mut data_adapters {
492            futures.push(adapter.connect());
493        }
494
495        let results = join_all(futures).await;
496        let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
497
498        if errors.is_empty() {
499            Ok(())
500        } else {
501            Err(errors)
502        }
503    }
504
505    /// Disconnects all engine clients.
506    #[allow(clippy::await_holding_refcell_ref)]
507    async fn disconnect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
508        let mut data_engine = self.data_engine.borrow_mut();
509        let mut data_adapters = data_engine.get_clients_mut();
510        let mut futures = Vec::with_capacity(data_adapters.len());
511
512        for adapter in &mut data_adapters {
513            futures.push(adapter.disconnect());
514        }
515
516        let results = join_all(futures).await;
517        let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
518
519        if errors.is_empty() {
520            Ok(())
521        } else {
522            Err(errors)
523        }
524    }
525
526    /// Stops engine clients.
527    fn stop_clients(&self) {
528        self.data_engine.borrow_mut().stop();
529    }
530
531    /// Initializes the portfolio (orders & positions).
532    const fn initialize_portfolio(&self) {
533        // TODO: Placeholder: portfolio initialization to be implemented in next pass
534    }
535
536    /// Awaits engine clients to connect and initialize.
537    ///
538    /// Blocks until connected or timeout.
539    const fn await_engines_connected(&self) {
540        // TODO: await engine connections with timeout
541    }
542
543    /// Awaits execution engine state reconciliation.
544    ///
545    /// Blocks until executions are reconciled or timeout.
546    const fn await_execution_reconciliation(&self) {
547        // TODO: await execution reconciliation with timeout
548    }
549
550    /// Awaits portfolio initialization.
551    ///
552    /// Blocks until portfolio is initialized or timeout.
553    const fn await_portfolio_initialized(&self) {
554        // TODO: await portfolio initialization with timeout
555    }
556
557    /// Awaits post-stop trader residual events.
558    ///
559    /// Allows final cleanup before full shutdown.
560    const fn await_trader_residuals(&self) {
561        // TODO: await trader residual events after stop
562    }
563
564    /// Checks if engine clients are connected.
565    const fn check_engines_connected(&self) {
566        // TODO: check engine connection status
567    }
568
569    /// Checks if engine clients are disconnected.
570    const fn check_engines_disconnected(&self) {
571        // TODO: check engine disconnection status
572    }
573
574    /// Checks if the portfolio has been initialized.
575    const fn check_portfolio_initialized(&self) {
576        // TODO: check portfolio initialized status
577    }
578
579    /// Flushes the stream writer.
580    const fn flush_writer(&self) {
581        // TODO: No writer in this kernel version; placeholder for future streaming
582    }
583}