nautilus_system/
kernel.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    any::Any,
22    cell::{Ref, RefCell},
23    rc::Rc,
24    time::Duration,
25};
26
27#[cfg(feature = "live")]
28use nautilus_common::live::clock::LiveClock;
29use nautilus_common::{
30    cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
31    clock::{Clock, TestClock},
32    component::Component,
33    enums::Environment,
34    logging::{
35        headers, init_logging, init_tracing,
36        logger::{LogGuard, LoggerConfig},
37        writer::FileWriterConfig,
38    },
39    messages::{DataResponse, data::DataCommand, execution::TradingCommand},
40    msgbus::{
41        self, MessageBus, get_message_bus,
42        handler::{ShareableMessageHandler, TypedMessageHandler},
43        set_message_bus,
44        switchboard::MessagingSwitchboard,
45    },
46    runner::get_data_cmd_sender,
47};
48use nautilus_core::{UUID4, UnixNanos, WeakCell};
49use nautilus_data::engine::DataEngine;
50use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
51use nautilus_model::{
52    enums::OrderStatus,
53    events::{OrderCanceled, OrderEventAny, OrderExpired},
54    identifiers::TraderId,
55    orders::Order,
56    reports::OrderStatusReport,
57};
58use nautilus_portfolio::portfolio::Portfolio;
59use nautilus_risk::engine::RiskEngine;
60use ustr::Ustr;
61
62use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
63
64/// Core Nautilus system kernel.
65///
66/// Orchestrates data and execution engines, cache, clock, and messaging across environments.
67#[derive(Debug)]
68pub struct NautilusKernel {
69    /// The kernel name (for logging and identification).
70    pub name: String,
71    /// The unique instance identifier for this kernel.
72    pub instance_id: UUID4,
73    /// The machine identifier (hostname or similar).
74    pub machine_id: String,
75    /// The kernel configuration.
76    pub config: Box<dyn NautilusKernelConfig>,
77    /// The shared in-memory cache.
78    pub cache: Rc<RefCell<Cache>>,
79    /// The clock driving the kernel.
80    pub clock: Rc<RefCell<dyn Clock>>,
81    /// The portfolio manager.
82    pub portfolio: Rc<RefCell<Portfolio>>,
83    /// Guard for the logging subsystem (keeps logger thread alive).
84    pub log_guard: LogGuard,
85    /// The data engine instance.
86    pub data_engine: Rc<RefCell<DataEngine>>,
87    /// The risk engine instance.
88    pub risk_engine: Rc<RefCell<RiskEngine>>,
89    /// The execution engine instance.
90    pub exec_engine: Rc<RefCell<ExecutionEngine>>,
91    /// The order emulator for handling emulated orders.
92    pub order_emulator: OrderEmulatorAdapter,
93    /// The trader component.
94    pub trader: Trader,
95    /// The UNIX timestamp (nanoseconds) when the kernel was created.
96    pub ts_created: UnixNanos,
97    /// The UNIX timestamp (nanoseconds) when the kernel was last started.
98    pub ts_started: Option<UnixNanos>,
99    /// The UNIX timestamp (nanoseconds) when the kernel was last shutdown.
100    pub ts_shutdown: Option<UnixNanos>,
101}
102
103impl NautilusKernel {
104    /// Create a new [`NautilusKernelBuilder`] for fluent configuration.
105    #[must_use]
106    pub const fn builder(
107        name: String,
108        trader_id: TraderId,
109        environment: Environment,
110    ) -> NautilusKernelBuilder {
111        NautilusKernelBuilder::new(name, trader_id, environment)
112    }
113
114    /// Create a new [`NautilusKernel`] instance.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the kernel fails to initialize.
119    pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
120        let instance_id = config.instance_id().unwrap_or_default();
121        let machine_id = Self::determine_machine_id()?;
122
123        let logger_config = config.logging();
124        let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
125        headers::log_header(
126            config.trader_id(),
127            &machine_id,
128            instance_id,
129            Ustr::from(stringify!(LiveNode)),
130        );
131
132        log::info!("Building system kernel");
133
134        let clock = Self::initialize_clock(&config.environment());
135        let cache = Self::initialize_cache(config.cache());
136
137        let msgbus = Rc::new(RefCell::new(MessageBus::new(
138            config.trader_id(),
139            instance_id,
140            Some(name.clone()),
141            None,
142        )));
143        set_message_bus(msgbus);
144
145        let portfolio = Rc::new(RefCell::new(Portfolio::new(
146            cache.clone(),
147            clock.clone(),
148            config.portfolio(),
149        )));
150
151        let risk_engine = RiskEngine::new(
152            config.risk_engine().unwrap_or_default(),
153            portfolio.borrow().clone_shallow(),
154            clock.clone(),
155            cache.clone(),
156        );
157        let risk_engine = Rc::new(RefCell::new(risk_engine));
158
159        let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
160        let exec_engine = Rc::new(RefCell::new(exec_engine));
161
162        // Create order emulator (auto-registers message handlers)
163        let order_emulator = OrderEmulatorAdapter::new(clock.clone(), cache.clone());
164
165        let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
166        let data_engine = Rc::new(RefCell::new(data_engine));
167
168        // Register DataEngine command execution
169
170        let data_engine_weak = WeakCell::from(Rc::downgrade(&data_engine));
171        let data_engine_weak_clone1 = data_engine_weak.clone();
172        let endpoint = MessagingSwitchboard::data_engine_execute();
173        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
174            move |cmd: &DataCommand| {
175                if let Some(engine_rc) = data_engine_weak_clone1.upgrade() {
176                    engine_rc.borrow_mut().execute(cmd);
177                }
178            },
179        )));
180        msgbus::register(endpoint, handler);
181
182        // Register DataEngine command queueing
183        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
184        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
185            move |cmd: &DataCommand| {
186                get_data_cmd_sender().clone().execute(cmd.clone());
187            },
188        )));
189        msgbus::register(endpoint, handler);
190
191        // Register DataEngine process handler
192        let endpoint = MessagingSwitchboard::data_engine_process();
193        let data_engine_weak2 = data_engine_weak.clone();
194        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
195            move |data: &dyn Any| {
196                if let Some(engine_rc) = data_engine_weak2.upgrade() {
197                    engine_rc.borrow_mut().process(data);
198                }
199            },
200        )));
201        msgbus::register(endpoint, handler);
202
203        // Register DataEngine response handler
204        let endpoint = MessagingSwitchboard::data_engine_response();
205        let data_engine_weak3 = data_engine_weak;
206        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
207            move |resp: &DataResponse| {
208                if let Some(engine_rc) = data_engine_weak3.upgrade() {
209                    engine_rc.borrow_mut().response(resp.clone());
210                }
211            },
212        )));
213        msgbus::register(endpoint, handler);
214
215        // Register RiskEngine execute handler
216        let risk_engine_weak = WeakCell::from(Rc::downgrade(&risk_engine));
217        let endpoint = MessagingSwitchboard::risk_engine_execute();
218        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
219            move |cmd: &TradingCommand| {
220                if let Some(engine_rc) = risk_engine_weak.upgrade() {
221                    engine_rc.borrow_mut().execute(cmd.clone());
222                }
223            },
224        )));
225        msgbus::register(endpoint, handler);
226
227        // Register ExecEngine execute handler
228        let exec_engine_weak = WeakCell::from(Rc::downgrade(&exec_engine));
229        let exec_engine_weak_clone = exec_engine_weak.clone();
230        let endpoint = MessagingSwitchboard::exec_engine_execute();
231        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
232            move |cmd: &TradingCommand| {
233                if let Some(engine_rc) = exec_engine_weak.upgrade() {
234                    engine_rc.borrow().execute(cmd);
235                }
236            },
237        )));
238        msgbus::register(endpoint, handler);
239
240        // Register ExecEngine process handler
241        let endpoint = MessagingSwitchboard::exec_engine_process();
242        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
243            move |event: &OrderEventAny| {
244                if let Some(engine_rc) = exec_engine_weak_clone.upgrade() {
245                    engine_rc.borrow_mut().process(event);
246                } else {
247                    log::error!(
248                        "ExecEngine dropped, cannot process order event: {:?}",
249                        event.client_order_id()
250                    );
251                }
252            },
253        )));
254        msgbus::register(endpoint, handler);
255
256        let cache_weak = WeakCell::from(Rc::downgrade(&cache));
257        let exec_engine_weak2 = WeakCell::from(Rc::downgrade(&exec_engine));
258        let trader_id = config.trader_id();
259
260        let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
261        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
262            move |report: &OrderStatusReport| {
263                let Some(cache_rc) = cache_weak.upgrade() else {
264                    log::error!("Cache dropped, cannot reconcile order status report");
265                    return;
266                };
267                let Some(exec_engine_rc) = exec_engine_weak2.upgrade() else {
268                    log::error!("ExecEngine dropped, cannot reconcile order status report");
269                    return;
270                };
271
272                let cache = cache_rc.borrow();
273
274                let order = report
275                    .client_order_id
276                    .and_then(|id| cache.order(&id).cloned())
277                    .or_else(|| {
278                        cache
279                            .client_order_id(&report.venue_order_id)
280                            .and_then(|cid| cache.order(cid).cloned())
281                    });
282
283                let Some(order) = order else {
284                    log::debug!(
285                        "Order not found in cache for reconciliation: client_order_id={:?}, venue_order_id={}",
286                        report.client_order_id,
287                        report.venue_order_id
288                    );
289                    return;
290                };
291
292                if order.status() == report.order_status {
293                    return;
294                }
295
296                if !order.is_open() {
297                    return;
298                }
299
300                drop(cache); // Release borrow before processing
301
302                let event: Option<OrderEventAny> = match report.order_status {
303                    OrderStatus::Canceled => {
304                        log::debug!(
305                            "Reconciling canceled order: client_order_id={}, venue_order_id={}",
306                            order.client_order_id(),
307                            report.venue_order_id
308                        );
309                        Some(OrderEventAny::Canceled(OrderCanceled::new(
310                            trader_id,
311                            order.strategy_id(),
312                            order.instrument_id(),
313                            order.client_order_id(),
314                            UUID4::new(),
315                            report.ts_last,
316                            report.ts_init,
317                            true, // reconciliation
318                            Some(report.venue_order_id),
319                            Some(report.account_id),
320                        )))
321                    }
322                    OrderStatus::Expired => {
323                        log::debug!(
324                            "Reconciling expired order: client_order_id={}, venue_order_id={}",
325                            order.client_order_id(),
326                            report.venue_order_id
327                        );
328                        Some(OrderEventAny::Expired(OrderExpired::new(
329                            trader_id,
330                            order.strategy_id(),
331                            order.instrument_id(),
332                            order.client_order_id(),
333                            UUID4::new(),
334                            report.ts_last,
335                            report.ts_init,
336                            true, // reconciliation
337                            Some(report.venue_order_id),
338                            Some(report.account_id),
339                        )))
340                    }
341                    _ => None,
342                };
343
344                if let Some(evt) = event {
345                    exec_engine_rc.borrow_mut().process(&evt);
346                }
347            },
348        )));
349        msgbus::register(endpoint, handler);
350
351        let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_mass_status();
352        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
353            move |report: &dyn Any| {
354                log::debug!(
355                    "Received execution mass status for reconciliation: {:?}",
356                    report.type_id()
357                );
358            },
359        )));
360        msgbus::register(endpoint, handler);
361
362        let trader = Trader::new(
363            config.trader_id(),
364            instance_id,
365            config.environment(),
366            clock.clone(),
367            cache.clone(),
368            portfolio.clone(),
369        );
370
371        let ts_created = clock.borrow().timestamp_ns();
372
373        Ok(Self {
374            name,
375            instance_id,
376            machine_id,
377            config: Box::new(config),
378            cache,
379            clock,
380            portfolio,
381            log_guard,
382            data_engine,
383            risk_engine,
384            exec_engine,
385            order_emulator,
386            trader,
387            ts_created,
388            ts_started: None,
389            ts_shutdown: None,
390        })
391    }
392
393    fn determine_machine_id() -> anyhow::Result<String> {
394        sysinfo::System::host_name().ok_or_else(|| anyhow::anyhow!("Failed to determine hostname"))
395    }
396
397    fn initialize_logging(
398        trader_id: TraderId,
399        instance_id: UUID4,
400        config: LoggerConfig,
401    ) -> anyhow::Result<LogGuard> {
402        let log_guard = init_logging(
403            trader_id,
404            instance_id,
405            config,
406            FileWriterConfig::default(), // TODO: Properly incorporate file writer config
407        )?;
408
409        init_tracing()?;
410
411        Ok(log_guard)
412    }
413
414    fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
415        match environment {
416            Environment::Backtest => {
417                let test_clock = TestClock::new();
418                Rc::new(RefCell::new(test_clock))
419            }
420            #[cfg(feature = "live")]
421            Environment::Live | Environment::Sandbox => {
422                let live_clock = LiveClock::default();
423                Rc::new(RefCell::new(live_clock))
424            }
425            #[cfg(not(feature = "live"))]
426            Environment::Live | Environment::Sandbox => {
427                panic!(
428                    "Live/Sandbox environment requires the 'live' feature to be enabled. \
429                     Build with `--features live` or add `features = [\"live\"]` to your dependency."
430                );
431            }
432        }
433    }
434
435    fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
436        let cache_config = cache_config.unwrap_or_default();
437
438        // TODO: Placeholder: persistent database adapter can be initialized here (e.g., Redis)
439        let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
440        let cache = Cache::new(Some(cache_config), cache_database);
441
442        Rc::new(RefCell::new(cache))
443    }
444
445    fn cancel_timers(&self) {
446        self.clock.borrow_mut().cancel_timers();
447    }
448
449    #[must_use]
450    pub fn generate_timestamp_ns(&self) -> UnixNanos {
451        self.clock.borrow().timestamp_ns()
452    }
453
454    /// Returns the kernel's environment context (Backtest, Sandbox, Live).
455    #[must_use]
456    pub fn environment(&self) -> Environment {
457        self.config.environment()
458    }
459
460    /// Returns the kernel's name.
461    #[must_use]
462    pub const fn name(&self) -> &str {
463        self.name.as_str()
464    }
465
466    /// Returns the kernel's trader ID.
467    #[must_use]
468    pub fn trader_id(&self) -> TraderId {
469        self.config.trader_id()
470    }
471
472    /// Returns the kernel's machine ID.
473    #[must_use]
474    pub fn machine_id(&self) -> &str {
475        &self.machine_id
476    }
477
478    /// Returns the kernel's instance ID.
479    #[must_use]
480    pub const fn instance_id(&self) -> UUID4 {
481        self.instance_id
482    }
483
484    /// Returns the delay after stopping the node to await residual events before final shutdown.
485    #[must_use]
486    pub fn delay_post_stop(&self) -> Duration {
487        self.config.delay_post_stop()
488    }
489
490    /// Returns the UNIX timestamp (ns) when the kernel was created.
491    #[must_use]
492    pub const fn ts_created(&self) -> UnixNanos {
493        self.ts_created
494    }
495
496    /// Returns the UNIX timestamp (ns) when the kernel was last started.
497    #[must_use]
498    pub const fn ts_started(&self) -> Option<UnixNanos> {
499        self.ts_started
500    }
501
502    /// Returns the UNIX timestamp (ns) when the kernel was last shutdown.
503    #[must_use]
504    pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
505        self.ts_shutdown
506    }
507
508    /// Returns whether the kernel has been configured to load state.
509    #[must_use]
510    pub fn load_state(&self) -> bool {
511        self.config.load_state()
512    }
513
514    /// Returns whether the kernel has been configured to save state.
515    #[must_use]
516    pub fn save_state(&self) -> bool {
517        self.config.save_state()
518    }
519
520    /// Returns the kernel's clock.
521    #[must_use]
522    pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
523        self.clock.clone()
524    }
525
526    /// Returns the kernel's cache.
527    #[must_use]
528    pub fn cache(&self) -> Rc<RefCell<Cache>> {
529        self.cache.clone()
530    }
531
532    /// Returns the kernel's message bus.  // TODO: TBD if this is necessary
533    #[must_use]
534    pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
535        get_message_bus()
536    }
537
538    /// Returns the kernel's portfolio.
539    #[must_use]
540    pub fn portfolio(&self) -> Ref<'_, Portfolio> {
541        self.portfolio.borrow()
542    }
543
544    /// Returns the kernel's data engine.
545    #[must_use]
546    pub fn data_engine(&self) -> Ref<'_, DataEngine> {
547        self.data_engine.borrow()
548    }
549
550    /// Returns the kernel's risk engine.
551    #[must_use]
552    pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
553        &self.risk_engine
554    }
555
556    /// Returns the kernel's execution engine.
557    #[must_use]
558    pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
559        &self.exec_engine
560    }
561
562    /// Returns the kernel's trader.
563    #[must_use]
564    pub const fn trader(&self) -> &Trader {
565        &self.trader
566    }
567
568    /// Starts the Nautilus system kernel.
569    pub async fn start_async(&mut self) {
570        log::info!("Starting");
571        self.start_engines();
572
573        log::info!("Initializing trader");
574        if let Err(e) = self.trader.initialize() {
575            log::error!("Error initializing trader: {e:?}");
576            return;
577        }
578
579        log::info!("Starting clients...");
580        if let Err(e) = self.start_clients() {
581            log::error!("Error starting clients: {e:?}");
582        }
583        log::info!("Clients started");
584
585        if let Err(e) = self.trader.start() {
586            log::error!("Error starting trader: {e:?}");
587        }
588
589        self.ts_started = Some(self.clock.borrow().timestamp_ns());
590        log::info!("Started");
591    }
592
593    /// Stops the trader and its registered components.
594    ///
595    /// This method initiates a graceful shutdown of trading components (strategies, actors)
596    /// which may trigger residual events such as order cancellations. The caller should
597    /// continue processing events after calling this method to handle these residual events.
598    pub fn stop_trader(&mut self) {
599        log::info!("Stopping");
600
601        // Stop the trader (it will stop all registered components)
602        if let Err(e) = self.trader.stop() {
603            log::error!("Error stopping trader: {e:?}");
604        }
605    }
606
607    /// Finalizes the kernel shutdown after the grace period.
608    ///
609    /// This method should be called after the residual events grace period has elapsed
610    /// and all remaining events have been processed. It disconnects clients and stops engines.
611    pub async fn finalize_stop(&mut self) {
612        // Stop all adapter clients
613        if let Err(e) = self.stop_all_clients() {
614            log::error!("Error stopping clients: {e:?}");
615        }
616
617        self.stop_engines();
618        self.cancel_timers();
619
620        self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
621        log::info!("Stopped");
622    }
623
624    /// Resets the Nautilus system kernel to its initial state.
625    pub fn reset(&mut self) {
626        log::info!("Resetting");
627
628        if let Err(e) = self.trader.reset() {
629            log::error!("Error resetting trader: {e:?}");
630        }
631
632        // Reset engines
633        self.data_engine.borrow_mut().reset();
634        // TODO: Reset other engines when reset methods are available
635
636        self.ts_started = None;
637        self.ts_shutdown = None;
638
639        log::info!("Reset");
640    }
641
642    /// Disposes of the Nautilus system kernel, releasing resources.
643    pub fn dispose(&mut self) {
644        log::info!("Disposing");
645
646        if let Err(e) = self.trader.dispose() {
647            log::error!("Error disposing trader: {e:?}");
648        }
649
650        self.stop_engines();
651
652        self.data_engine.borrow_mut().dispose();
653        // TODO: Implement dispose methods for other engines
654
655        log::info!("Disposed");
656    }
657
658    /// Cancels all tasks currently running under the kernel.
659    ///
660    /// Intended for cleanup during shutdown.
661    const fn cancel_all_tasks(&self) {
662        // TODO: implement task cancellation logic for async contexts
663    }
664
665    /// Starts all engine components.
666    fn start_engines(&self) {
667        self.data_engine.borrow_mut().start();
668        // TODO: Start other engines when methods are available
669    }
670
671    /// Stops all engine components.
672    fn stop_engines(&self) {
673        self.data_engine.borrow_mut().stop();
674        // TODO: Stop other engines when methods are available
675    }
676
677    /// Starts all engine clients.
678    ///
679    /// Note: Async connection (connect/disconnect) is handled by LiveNode for live clients.
680    /// This method only handles synchronous start operations on execution clients.
681    fn start_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
682        let mut errors = Vec::new();
683
684        {
685            let mut exec_engine = self.exec_engine.borrow_mut();
686            let exec_adapters = exec_engine.get_clients_mut();
687
688            for adapter in exec_adapters {
689                if let Err(e) = adapter.start() {
690                    log::error!("Error starting execution client {}: {e}", adapter.client_id);
691                    errors.push(e);
692                }
693            }
694        }
695
696        if errors.is_empty() {
697            Ok(())
698        } else {
699            Err(errors)
700        }
701    }
702
703    /// Stops all engine clients.
704    ///
705    /// Note: Async disconnection is handled by LiveNode for live clients.
706    /// This method only handles synchronous stop operations on execution clients.
707    fn stop_all_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
708        let mut errors = Vec::new();
709
710        {
711            let mut exec_engine = self.exec_engine.borrow_mut();
712            let exec_adapters = exec_engine.get_clients_mut();
713
714            for adapter in exec_adapters {
715                if let Err(e) = adapter.stop() {
716                    log::error!("Error stopping execution client {}: {e}", adapter.client_id);
717                    errors.push(e);
718                }
719            }
720        }
721
722        if errors.is_empty() {
723            Ok(())
724        } else {
725            Err(errors)
726        }
727    }
728
729    /// Stops engine clients.
730    fn stop_clients(&self) {
731        self.data_engine.borrow_mut().stop();
732    }
733
734    /// Connects all engine clients.
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if any client fails to connect.
739    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
740    pub async fn connect_clients(&mut self) -> anyhow::Result<()> {
741        log::info!("Connecting clients...");
742        self.data_engine.borrow_mut().connect().await?;
743        self.exec_engine.borrow_mut().connect().await?;
744        Ok(())
745    }
746
747    /// Disconnects all engine clients.
748    ///
749    /// # Errors
750    ///
751    /// Returns an error if any client fails to disconnect.
752    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
753    pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
754        log::info!("Disconnecting clients...");
755        self.data_engine.borrow_mut().disconnect().await?;
756        self.exec_engine.borrow_mut().disconnect().await?;
757        Ok(())
758    }
759
760    /// Initializes the portfolio (orders & positions).
761    const fn initialize_portfolio(&self) {
762        // TODO: Placeholder: portfolio initialization to be implemented in next pass
763    }
764
765    /// Awaits execution engine state reconciliation.
766    ///
767    /// Blocks until executions are reconciled or timeout.
768    const fn await_execution_reconciliation(&self) {
769        // TODO: await execution reconciliation with timeout
770    }
771
772    /// Awaits portfolio initialization.
773    ///
774    /// Blocks until portfolio is initialized or timeout.
775    const fn await_portfolio_initialized(&self) {
776        // TODO: await portfolio initialization with timeout
777    }
778
779    /// Awaits post-stop trader residual events.
780    ///
781    /// Allows final cleanup before full shutdown.
782    const fn await_trader_residuals(&self) {
783        // TODO: await trader residual events after stop
784    }
785
786    /// Returns `true` if all engine clients are connected.
787    #[must_use]
788    pub fn check_engines_connected(&self) -> bool {
789        self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
790    }
791
792    /// Returns `true` if all engine clients are disconnected.
793    #[must_use]
794    pub fn check_engines_disconnected(&self) -> bool {
795        self.data_engine.borrow().check_disconnected()
796            && self.exec_engine.borrow().check_disconnected()
797    }
798
799    /// Checks if the portfolio has been initialized.
800    const fn check_portfolio_initialized(&self) {
801        // TODO: check portfolio initialized status
802    }
803
804    /// Flushes the stream writer.
805    const fn flush_writer(&self) {
806        // TODO: No writer in this kernel version; placeholder for future streaming
807    }
808}