nautilus_live/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    fmt::Debug,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, AtomicU8, Ordering},
21    },
22    time::{Duration, Instant},
23};
24
25use nautilus_common::{
26    actor::{Actor, DataActor},
27    cache::database::CacheDatabaseAdapter,
28    component::Component,
29    enums::{Environment, LogColor},
30    log_info,
31    messages::{DataEvent, ExecutionEvent, data::DataCommand, execution::TradingCommand},
32    timer::TimeEventHandler,
33};
34use nautilus_core::UUID4;
35use nautilus_model::{
36    events::OrderEventAny,
37    identifiers::{StrategyId, TraderId},
38};
39use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
40use nautilus_trading::strategy::Strategy;
41
42use crate::{
43    builder::LiveNodeBuilder,
44    config::LiveNodeConfig,
45    manager::{ExecutionManager, ExecutionManagerConfig},
46    runner::{AsyncRunner, AsyncRunnerChannels},
47};
48
49/// Lifecycle state of the `LiveNode` runner.
50#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
51#[repr(u8)]
52pub enum NodeState {
53    #[default]
54    Idle = 0,
55    Starting = 1,
56    Running = 2,
57    ShuttingDown = 3,
58    Stopped = 4,
59}
60
61impl NodeState {
62    /// Creates a `NodeState` from its `u8` representation.
63    ///
64    /// # Panics
65    ///
66    /// Panics if the value is not a valid `NodeState` discriminant (0-4).
67    #[must_use]
68    pub const fn from_u8(value: u8) -> Self {
69        match value {
70            0 => Self::Idle,
71            1 => Self::Starting,
72            2 => Self::Running,
73            3 => Self::ShuttingDown,
74            4 => Self::Stopped,
75            _ => panic!("Invalid NodeState value"),
76        }
77    }
78
79    /// Returns the `u8` representation of this state.
80    #[must_use]
81    pub const fn as_u8(self) -> u8 {
82        self as u8
83    }
84
85    /// Returns whether the state is `Running`.
86    #[must_use]
87    pub const fn is_running(&self) -> bool {
88        matches!(self, Self::Running)
89    }
90}
91
92/// A thread-safe handle to control a `LiveNode` from other threads.
93///
94/// This allows stopping and querying the node's state without requiring the
95/// node itself to be Send + Sync.
96#[derive(Clone, Debug)]
97pub struct LiveNodeHandle {
98    /// Atomic flag indicating if the node should stop.
99    pub(crate) stop_flag: Arc<AtomicBool>,
100    /// Atomic state as `NodeState::as_u8()`.
101    pub(crate) state: Arc<AtomicU8>,
102}
103
104impl Default for LiveNodeHandle {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl LiveNodeHandle {
111    /// Creates a new handle with default (`Idle`) state.
112    #[must_use]
113    pub fn new() -> Self {
114        Self {
115            stop_flag: Arc::new(AtomicBool::new(false)),
116            state: Arc::new(AtomicU8::new(NodeState::Idle.as_u8())),
117        }
118    }
119
120    /// Sets the node state (internal use).
121    pub(crate) fn set_state(&self, state: NodeState) {
122        self.state.store(state.as_u8(), Ordering::Relaxed);
123        if state == NodeState::Running {
124            // Clear stop flag when entering running state
125            self.stop_flag.store(false, Ordering::Relaxed);
126        }
127    }
128
129    /// Returns the current node state.
130    #[must_use]
131    pub fn state(&self) -> NodeState {
132        NodeState::from_u8(self.state.load(Ordering::Relaxed))
133    }
134
135    /// Returns whether the node should stop.
136    #[must_use]
137    pub fn should_stop(&self) -> bool {
138        self.stop_flag.load(Ordering::Relaxed)
139    }
140
141    /// Returns whether the node is currently running.
142    #[must_use]
143    pub fn is_running(&self) -> bool {
144        self.state().is_running()
145    }
146
147    /// Signals the node to stop.
148    pub fn stop(&self) {
149        self.stop_flag.store(true, Ordering::Relaxed);
150    }
151}
152
153/// High-level abstraction for a live Nautilus system node.
154///
155/// Provides a simplified interface for running live systems
156/// with automatic client management and lifecycle handling.
157#[derive(Debug)]
158#[cfg_attr(
159    feature = "python",
160    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
161)]
162pub struct LiveNode {
163    kernel: NautilusKernel,
164    runner: Option<AsyncRunner>,
165    config: LiveNodeConfig,
166    handle: LiveNodeHandle,
167    exec_manager: ExecutionManager,
168    shutdown_deadline: Option<tokio::time::Instant>,
169    #[cfg(feature = "python")]
170    #[allow(dead_code)] // TODO: Under development
171    python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
172}
173
174impl LiveNode {
175    /// Creates a new `LiveNode` from builder components.
176    ///
177    /// This is an internal constructor used by `LiveNodeBuilder`.
178    #[must_use]
179    pub(crate) fn new_from_builder(
180        kernel: NautilusKernel,
181        runner: AsyncRunner,
182        config: LiveNodeConfig,
183        exec_manager: ExecutionManager,
184    ) -> Self {
185        Self {
186            kernel,
187            runner: Some(runner),
188            config,
189            handle: LiveNodeHandle::new(),
190            exec_manager,
191            shutdown_deadline: None,
192            #[cfg(feature = "python")]
193            python_actors: Vec::new(),
194        }
195    }
196
197    /// Creates a new [`LiveNodeBuilder`] for fluent configuration.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if the environment is invalid for live trading.
202    pub fn builder(
203        trader_id: TraderId,
204        environment: Environment,
205    ) -> anyhow::Result<LiveNodeBuilder> {
206        LiveNodeBuilder::new(trader_id, environment)
207    }
208
209    /// Creates a new [`LiveNode`] directly from a kernel name and optional configuration.
210    ///
211    /// This is a convenience method for creating a live node with a pre-configured
212    /// kernel configuration, bypassing the builder pattern. If no config is provided,
213    /// a default configuration will be used.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if kernel construction fails.
218    pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
219        let mut config = config.unwrap_or_default();
220        config.environment = Environment::Live;
221
222        match config.environment() {
223            Environment::Sandbox | Environment::Live => {}
224            Environment::Backtest => {
225                anyhow::bail!("LiveNode cannot be used with Backtest environment");
226            }
227        }
228
229        let runner = AsyncRunner::new();
230        let kernel = NautilusKernel::new(name, config.clone())?;
231
232        let exec_manager_config =
233            ExecutionManagerConfig::from(&config.exec_engine).with_trader_id(config.trader_id);
234        let exec_manager = ExecutionManager::new(
235            kernel.clock.clone(),
236            kernel.cache.clone(),
237            exec_manager_config,
238        );
239
240        log::info!("LiveNode built successfully with kernel config");
241
242        Ok(Self {
243            kernel,
244            runner: Some(runner),
245            config,
246            handle: LiveNodeHandle::new(),
247            exec_manager,
248            shutdown_deadline: None,
249            #[cfg(feature = "python")]
250            python_actors: Vec::new(),
251        })
252    }
253
254    /// Returns a thread-safe handle to control this node.
255    #[must_use]
256    pub fn handle(&self) -> LiveNodeHandle {
257        self.handle.clone()
258    }
259
260    /// Starts the live node.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if startup fails.
265    pub async fn start(&mut self) -> anyhow::Result<()> {
266        if self.state().is_running() {
267            anyhow::bail!("Already running");
268        }
269
270        self.handle.set_state(NodeState::Starting);
271
272        self.kernel.start_async().await;
273        self.kernel.connect_clients().await?;
274        self.await_engines_connected().await?;
275
276        // Process pending data events before reconciliation and starting trader
277        if let Some(runner) = self.runner.as_mut() {
278            runner.drain_pending_data_events();
279        }
280
281        self.perform_startup_reconciliation().await?;
282
283        self.kernel.start_trader();
284
285        self.handle.set_state(NodeState::Running);
286
287        Ok(())
288    }
289
290    /// Stop the live node.
291    ///
292    /// This method stops the trader, waits for the configured grace period to allow
293    /// residual events to be processed, then finalizes the shutdown sequence.
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if shutdown fails.
298    pub async fn stop(&mut self) -> anyhow::Result<()> {
299        if !self.state().is_running() {
300            anyhow::bail!("Not running");
301        }
302
303        self.handle.set_state(NodeState::ShuttingDown);
304
305        self.kernel.stop_trader();
306        let delay = self.kernel.delay_post_stop();
307        log::info!("Awaiting residual events ({delay:?})...");
308
309        tokio::time::sleep(delay).await;
310        self.finalize_stop().await
311    }
312
313    /// Awaits engine clients to connect with timeout.
314    async fn await_engines_connected(&self) -> anyhow::Result<()> {
315        let start = Instant::now();
316        let timeout = self.config.timeout_connection;
317        let interval = Duration::from_millis(100);
318
319        while start.elapsed() < timeout {
320            if self.kernel.check_engines_connected() {
321                log::info!("All engine clients connected");
322                return Ok(());
323            }
324            tokio::time::sleep(interval).await;
325        }
326
327        anyhow::bail!("Timeout waiting for engine clients to connect after {timeout:?}")
328    }
329
330    /// Awaits engine clients to disconnect with timeout.
331    async fn await_engines_disconnected(&self) -> anyhow::Result<()> {
332        let start = Instant::now();
333        let timeout = self.config.timeout_disconnection;
334        let interval = Duration::from_millis(100);
335
336        while start.elapsed() < timeout {
337            if self.kernel.check_engines_disconnected() {
338                log::info!("All engine clients disconnected");
339                return Ok(());
340            }
341            tokio::time::sleep(interval).await;
342        }
343
344        anyhow::bail!("Timeout waiting for engine clients to disconnect after {timeout:?}")
345    }
346
347    /// Performs startup reconciliation to align internal state with venue state.
348    ///
349    /// This method queries each execution client for mass status (orders, fills, positions)
350    /// and reconciles any discrepancies with the local cache state.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if reconciliation fails or times out.
355    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
356    async fn perform_startup_reconciliation(&mut self) -> anyhow::Result<()> {
357        if !self.config.exec_engine.reconciliation {
358            log::info!("Startup reconciliation disabled");
359            return Ok(());
360        }
361
362        log_info!(
363            "Starting execution state reconciliation...",
364            color = LogColor::Blue
365        );
366
367        let lookback_mins = self
368            .config
369            .exec_engine
370            .reconciliation_lookback_mins
371            .map(|m| m as u64);
372
373        let timeout = self.config.timeout_reconciliation;
374        let start = Instant::now();
375        let client_ids = self.kernel.exec_engine.borrow().client_ids();
376
377        for client_id in client_ids {
378            if start.elapsed() > timeout {
379                log::warn!("Reconciliation timeout reached, stopping early");
380                break;
381            }
382
383            log_info!(
384                "Requesting mass status from {}...",
385                client_id,
386                color = LogColor::Blue
387            );
388
389            let mass_status_result = self
390                .kernel
391                .exec_engine
392                .borrow_mut()
393                .generate_mass_status(&client_id, lookback_mins)
394                .await;
395
396            match mass_status_result {
397                Ok(Some(mass_status)) => {
398                    log_info!(
399                        "Reconciling ExecutionMassStatus for {}",
400                        client_id,
401                        color = LogColor::Blue
402                    );
403                    let events = self
404                        .exec_manager
405                        .reconcile_execution_mass_status(mass_status)
406                        .await;
407
408                    if events.is_empty() {
409                        log_info!(
410                            "Reconciliation for {} succeeded",
411                            client_id,
412                            color = LogColor::Blue
413                        );
414                    } else {
415                        log::info!(
416                            color = LogColor::Blue as u8;
417                            "Reconciliation for {} generated {} events",
418                            client_id,
419                            events.len()
420                        );
421
422                        let mut exec_engine = self.kernel.exec_engine.borrow_mut();
423                        for event in events {
424                            exec_engine.process(&event);
425                        }
426                    }
427                }
428                Ok(None) => {
429                    log::warn!(
430                        "No mass status available from {client_id} \
431                         (likely adapter error when generating reports)"
432                    );
433                }
434                Err(e) => {
435                    log::warn!("Failed to get mass status from {client_id}: {e}");
436                }
437            }
438        }
439
440        self.kernel.portfolio.borrow_mut().initialize_orders();
441        self.kernel.portfolio.borrow_mut().initialize_positions();
442
443        let elapsed_secs = start.elapsed().as_secs_f64();
444        log_info!(
445            "Startup reconciliation completed in {:.2}s",
446            elapsed_secs,
447            color = LogColor::Blue
448        );
449
450        Ok(())
451    }
452
453    /// Run the live node with automatic shutdown handling.
454    ///
455    /// This method starts the node, runs indefinitely, and handles graceful shutdown
456    /// on interrupt signals.
457    ///
458    /// # Thread Safety
459    ///
460    /// The event loop runs directly on the current thread (not spawned) because the
461    /// msgbus uses thread-local storage. Endpoints registered by the kernel are only
462    /// accessible from the same thread.
463    ///
464    /// # Shutdown Sequence
465    ///
466    /// 1. Signal received (SIGINT or handle stop).
467    /// 2. Trader components stopped (triggers order cancellations, etc.).
468    /// 3. Event loop continues processing residual events for the configured grace period.
469    /// 4. Kernel finalized, clients disconnected, remaining events drained.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if the node fails to start or encounters a runtime error.
474    pub async fn run(&mut self) -> anyhow::Result<()> {
475        if self.state().is_running() {
476            anyhow::bail!("Already running");
477        }
478
479        let Some(runner) = self.runner.take() else {
480            anyhow::bail!("Runner already consumed - run() called twice");
481        };
482
483        let AsyncRunnerChannels {
484            mut time_evt_rx,
485            mut data_evt_rx,
486            mut data_cmd_rx,
487            mut exec_evt_rx,
488            mut exec_cmd_rx,
489        } = runner.take_channels();
490
491        log::info!("Event loop starting");
492
493        self.handle.set_state(NodeState::Starting);
494        self.kernel.start_async().await;
495
496        let stop_handle = self.handle.clone();
497        let mut pending = PendingEvents::default();
498
499        // Startup phase: process events while completing startup
500        // TODO: Add ctrl_c and stop_handle monitoring here to allow aborting a
501        // hanging startup. Currently signals during startup are ignored, and
502        // any pending stop_flag is cleared when transitioning to Running.
503        {
504            let startup_future = self.complete_startup();
505            tokio::pin!(startup_future);
506
507            loop {
508                tokio::select! {
509                    biased;
510
511                    result = &mut startup_future => {
512                        result?;
513                        break;
514                    }
515                    Some(handler) = time_evt_rx.recv() => {
516                        AsyncRunner::handle_time_event(handler);
517                    }
518                    Some(evt) = data_evt_rx.recv() => {
519                        pending.data_evts.push(evt);
520                    }
521                    Some(cmd) = data_cmd_rx.recv() => {
522                        pending.data_cmds.push(cmd);
523                    }
524                    Some(evt) = exec_evt_rx.recv() => {
525                        // Account and Report events are safe, order events conflict
526                        match evt {
527                            ExecutionEvent::Account(_) | ExecutionEvent::Report(_) => {
528                                AsyncRunner::handle_exec_event(evt);
529                            }
530                            ExecutionEvent::Order(order_evt) => {
531                                pending.order_evts.push(order_evt);
532                            }
533                        }
534                    }
535                    Some(cmd) = exec_cmd_rx.recv() => {
536                        pending.exec_cmds.push(cmd);
537                    }
538                }
539            }
540        }
541
542        pending.drain();
543
544        // Now start trader - instruments are in cache after drain()
545        self.kernel.start_trader();
546        self.handle.set_state(NodeState::Running);
547
548        // Running phase: runs until shutdown deadline expires
549        let mut residual_events = 0usize;
550
551        loop {
552            let shutdown_deadline = self.shutdown_deadline;
553            let is_shutting_down = self.state() == NodeState::ShuttingDown;
554
555            tokio::select! {
556                Some(handler) = time_evt_rx.recv() => {
557                    AsyncRunner::handle_time_event(handler);
558                    if is_shutting_down {
559                        log::debug!("Residual time event");
560                        residual_events += 1;
561                    }
562                }
563                Some(evt) = data_evt_rx.recv() => {
564                    if is_shutting_down {
565                        log::debug!("Residual data event: {evt:?}");
566                        residual_events += 1;
567                    }
568                    AsyncRunner::handle_data_event(evt);
569                }
570                Some(cmd) = data_cmd_rx.recv() => {
571                    if is_shutting_down {
572                        log::debug!("Residual data command: {cmd:?}");
573                        residual_events += 1;
574                    }
575                    AsyncRunner::handle_data_command(cmd);
576                }
577                Some(evt) = exec_evt_rx.recv() => {
578                    if is_shutting_down {
579                        log::debug!("Residual exec event: {evt:?}");
580                        residual_events += 1;
581                    }
582                    AsyncRunner::handle_exec_event(evt);
583                }
584                Some(cmd) = exec_cmd_rx.recv() => {
585                    if is_shutting_down {
586                        log::debug!("Residual exec command: {cmd:?}");
587                        residual_events += 1;
588                    }
589                    AsyncRunner::handle_exec_command(cmd);
590                }
591                result = tokio::signal::ctrl_c(), if self.state() == NodeState::Running => {
592                    match result {
593                        Ok(()) => log::info!("Received SIGINT, shutting down"),
594                        Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
595                    }
596                    self.initiate_shutdown();
597                }
598                () = async {
599                    loop {
600                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
601                        if stop_handle.should_stop() {
602                            log::info!("Received stop signal from handle");
603                            return;
604                        }
605                    }
606                }, if self.state() == NodeState::Running => {
607                    self.initiate_shutdown();
608                }
609                () = async {
610                    match shutdown_deadline {
611                        Some(deadline) => tokio::time::sleep_until(deadline).await,
612                        None => std::future::pending::<()>().await,
613                    }
614                }, if self.state() == NodeState::ShuttingDown => {
615                    break;
616                }
617            }
618        }
619
620        if residual_events > 0 {
621            log::debug!("Processed {residual_events} residual events during shutdown");
622        }
623
624        let _ = self.kernel.cache().borrow().check_residuals();
625
626        self.finalize_stop().await?;
627
628        // Handle events that arrived during finalize_stop
629        self.drain_channels(
630            &mut time_evt_rx,
631            &mut data_evt_rx,
632            &mut data_cmd_rx,
633            &mut exec_evt_rx,
634            &mut exec_cmd_rx,
635        );
636
637        log::info!("Event loop stopped");
638
639        Ok(())
640    }
641
642    async fn complete_startup(&mut self) -> anyhow::Result<()> {
643        self.kernel.connect_clients().await?;
644        self.await_engines_connected().await?;
645        self.perform_startup_reconciliation().await?;
646        Ok(())
647    }
648
649    fn initiate_shutdown(&mut self) {
650        self.kernel.stop_trader();
651        let delay = self.kernel.delay_post_stop();
652        log::info!("Awaiting residual events ({delay:?})...");
653
654        self.shutdown_deadline = Some(tokio::time::Instant::now() + delay);
655        self.handle.set_state(NodeState::ShuttingDown);
656    }
657
658    async fn finalize_stop(&mut self) -> anyhow::Result<()> {
659        self.kernel.disconnect_clients().await?;
660        self.await_engines_disconnected().await?;
661        self.kernel.finalize_stop().await;
662
663        self.handle.set_state(NodeState::Stopped);
664
665        Ok(())
666    }
667
668    fn drain_channels(
669        &self,
670        time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
671        data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
672        data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
673        exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
674        exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
675    ) {
676        let mut drained = 0;
677
678        while let Ok(handler) = time_evt_rx.try_recv() {
679            AsyncRunner::handle_time_event(handler);
680            drained += 1;
681        }
682        while let Ok(cmd) = data_cmd_rx.try_recv() {
683            AsyncRunner::handle_data_command(cmd);
684            drained += 1;
685        }
686        while let Ok(evt) = data_evt_rx.try_recv() {
687            AsyncRunner::handle_data_event(evt);
688            drained += 1;
689        }
690        while let Ok(cmd) = exec_cmd_rx.try_recv() {
691            AsyncRunner::handle_exec_command(cmd);
692            drained += 1;
693        }
694        while let Ok(evt) = exec_evt_rx.try_recv() {
695            AsyncRunner::handle_exec_event(evt);
696            drained += 1;
697        }
698
699        if drained > 0 {
700            log::info!("Drained {drained} remaining events during shutdown");
701        }
702    }
703
704    /// Gets the node's environment.
705    #[must_use]
706    pub fn environment(&self) -> Environment {
707        self.kernel.environment()
708    }
709
710    /// Gets a reference to the underlying kernel.
711    #[must_use]
712    pub const fn kernel(&self) -> &NautilusKernel {
713        &self.kernel
714    }
715
716    /// Gets an exclusive reference to the underlying kernel.
717    #[must_use]
718    pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
719        &mut self.kernel
720    }
721
722    /// Gets the node's trader ID.
723    #[must_use]
724    pub fn trader_id(&self) -> TraderId {
725        self.kernel.trader_id()
726    }
727
728    /// Gets the node's instance ID.
729    #[must_use]
730    pub const fn instance_id(&self) -> UUID4 {
731        self.kernel.instance_id()
732    }
733
734    /// Returns the current node state.
735    #[must_use]
736    pub fn state(&self) -> NodeState {
737        self.handle.state()
738    }
739
740    /// Checks if the live node is currently running.
741    #[must_use]
742    pub fn is_running(&self) -> bool {
743        self.state().is_running()
744    }
745
746    /// Sets the cache database adapter for persistence.
747    ///
748    /// This allows setting a database adapter (e.g., PostgreSQL, Redis) after the node
749    /// is built but before it starts running. The database adapter is used to persist
750    /// cache data for recovery and state management.
751    ///
752    /// # Errors
753    ///
754    /// Returns an error if the node is already running.
755    pub fn set_cache_database(
756        &mut self,
757        database: Box<dyn CacheDatabaseAdapter>,
758    ) -> anyhow::Result<()> {
759        if self.state() != NodeState::Idle {
760            anyhow::bail!(
761                "Cannot set cache database while node is running, set it before calling start()"
762            );
763        }
764
765        self.kernel.cache().borrow_mut().set_database(database);
766        Ok(())
767    }
768
769    /// Gets a reference to the execution manager.
770    #[must_use]
771    pub const fn exec_manager(&self) -> &ExecutionManager {
772        &self.exec_manager
773    }
774
775    /// Gets an exclusive reference to the execution manager.
776    #[must_use]
777    pub fn exec_manager_mut(&mut self) -> &mut ExecutionManager {
778        &mut self.exec_manager
779    }
780
781    /// Adds an actor to the trader.
782    ///
783    /// This method provides a high-level interface for adding actors to the underlying
784    /// trader without requiring direct access to the kernel. Actors should be added
785    /// after the node is built but before starting the node.
786    ///
787    /// # Errors
788    ///
789    /// Returns an error if:
790    /// - The trader is not in a valid state for adding components.
791    /// - An actor with the same ID is already registered.
792    /// - The node is currently running.
793    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
794    where
795        T: DataActor + Component + Actor + 'static,
796    {
797        if self.state() != NodeState::Idle {
798            anyhow::bail!(
799                "Cannot add actor while node is running, add actors before calling start()"
800            );
801        }
802
803        self.kernel.trader.add_actor(actor)
804    }
805
806    /// Adds an actor to the live node using a factory function.
807    ///
808    /// The factory function is called at registration time to create the actor,
809    /// avoiding cloning issues with non-cloneable actor types.
810    ///
811    /// # Errors
812    ///
813    /// Returns an error if:
814    /// - The node is currently running.
815    /// - The factory function fails to create the actor.
816    /// - The underlying trader registration fails.
817    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
818    where
819        F: FnOnce() -> anyhow::Result<T>,
820        T: DataActor + Component + Actor + 'static,
821    {
822        if self.state() != NodeState::Idle {
823            anyhow::bail!(
824                "Cannot add actor while node is running, add actors before calling start()"
825            );
826        }
827
828        self.kernel.trader.add_actor_from_factory(factory)
829    }
830
831    /// Adds a strategy to the trader.
832    ///
833    /// Strategies are registered in both the component registry (for lifecycle management)
834    /// and the actor registry (for data callbacks via msgbus).
835    ///
836    /// # Errors
837    ///
838    /// Returns an error if:
839    /// - The node is currently running.
840    /// - A strategy with the same ID is already registered.
841    pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
842    where
843        T: Strategy + Component + Debug + 'static,
844    {
845        if self.state() != NodeState::Idle {
846            anyhow::bail!(
847                "Cannot add strategy while node is running, add strategies before calling start()"
848            );
849        }
850
851        // Register external order claims before adding strategy (which moves it)
852        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
853        if let Some(claims) = strategy.external_order_claims() {
854            for instrument_id in claims {
855                self.exec_manager
856                    .claim_external_orders(instrument_id, strategy_id);
857            }
858            log_info!(
859                "Registered external order claims for {}: {:?}",
860                strategy_id,
861                strategy.external_order_claims(),
862                color = LogColor::Blue
863            );
864        }
865
866        self.kernel.trader.add_strategy(strategy)
867    }
868}
869
870/// Events queued during startup to avoid RefCell borrow conflicts.
871///
872/// During `connect_clients()`, the data_engine and exec_engine are borrowed
873/// across awaits. Processing commands/events that trigger msgbus handlers
874/// would try to borrow the same engines, causing a panic.
875#[derive(Default)]
876struct PendingEvents {
877    data_cmds: Vec<DataCommand>,
878    data_evts: Vec<DataEvent>,
879    exec_cmds: Vec<TradingCommand>,
880    order_evts: Vec<OrderEventAny>,
881}
882
883impl PendingEvents {
884    fn drain(&mut self) {
885        let total = self.data_evts.len()
886            + self.data_cmds.len()
887            + self.exec_cmds.len()
888            + self.order_evts.len();
889
890        if total > 0 {
891            log::debug!(
892                "Processing {total} events/commands queued during startup \
893                 (data_evts={}, data_cmds={}, exec_cmds={}, order_evts={})",
894                self.data_evts.len(),
895                self.data_cmds.len(),
896                self.exec_cmds.len(),
897                self.order_evts.len()
898            );
899        }
900
901        for evt in self.data_evts.drain(..) {
902            AsyncRunner::handle_data_event(evt);
903        }
904        for cmd in self.data_cmds.drain(..) {
905            AsyncRunner::handle_data_command(cmd);
906        }
907        for cmd in self.exec_cmds.drain(..) {
908            AsyncRunner::handle_exec_command(cmd);
909        }
910        for evt in self.order_evts.drain(..) {
911            AsyncRunner::handle_exec_event(ExecutionEvent::Order(evt));
912        }
913    }
914}
915
916#[cfg(test)]
917mod tests {
918    use nautilus_model::identifiers::TraderId;
919    use rstest::*;
920
921    use super::*;
922
923    #[rstest]
924    #[case(0, NodeState::Idle)]
925    #[case(1, NodeState::Starting)]
926    #[case(2, NodeState::Running)]
927    #[case(3, NodeState::ShuttingDown)]
928    #[case(4, NodeState::Stopped)]
929    fn test_node_state_from_u8_valid(#[case] value: u8, #[case] expected: NodeState) {
930        assert_eq!(NodeState::from_u8(value), expected);
931    }
932
933    #[rstest]
934    #[case(5)]
935    #[case(255)]
936    #[should_panic(expected = "Invalid NodeState value")]
937    fn test_node_state_from_u8_invalid_panics(#[case] value: u8) {
938        let _ = NodeState::from_u8(value);
939    }
940
941    #[rstest]
942    fn test_node_state_roundtrip() {
943        for state in [
944            NodeState::Idle,
945            NodeState::Starting,
946            NodeState::Running,
947            NodeState::ShuttingDown,
948            NodeState::Stopped,
949        ] {
950            assert_eq!(NodeState::from_u8(state.as_u8()), state);
951        }
952    }
953
954    #[rstest]
955    fn test_node_state_is_running_only_for_running() {
956        assert!(!NodeState::Idle.is_running());
957        assert!(!NodeState::Starting.is_running());
958        assert!(NodeState::Running.is_running());
959        assert!(!NodeState::ShuttingDown.is_running());
960        assert!(!NodeState::Stopped.is_running());
961    }
962
963    #[rstest]
964    fn test_handle_initial_state() {
965        let handle = LiveNodeHandle::new();
966
967        assert_eq!(handle.state(), NodeState::Idle);
968        assert!(!handle.should_stop());
969        assert!(!handle.is_running());
970    }
971
972    #[rstest]
973    fn test_handle_stop_sets_flag() {
974        let handle = LiveNodeHandle::new();
975
976        handle.stop();
977
978        assert!(handle.should_stop());
979    }
980
981    #[rstest]
982    fn test_handle_set_state_running_clears_stop_flag() {
983        let handle = LiveNodeHandle::new();
984        handle.stop();
985        assert!(handle.should_stop());
986
987        handle.set_state(NodeState::Running);
988
989        assert!(!handle.should_stop());
990        assert!(handle.is_running());
991        assert_eq!(handle.state(), NodeState::Running);
992    }
993
994    #[rstest]
995    fn test_handle_node_state_transitions() {
996        let handle = LiveNodeHandle::new();
997        assert_eq!(handle.state(), NodeState::Idle);
998
999        handle.set_state(NodeState::Starting);
1000        assert_eq!(handle.state(), NodeState::Starting);
1001        assert!(!handle.is_running());
1002
1003        handle.set_state(NodeState::Running);
1004        assert_eq!(handle.state(), NodeState::Running);
1005        assert!(handle.is_running());
1006
1007        handle.set_state(NodeState::ShuttingDown);
1008        assert_eq!(handle.state(), NodeState::ShuttingDown);
1009        assert!(!handle.is_running());
1010
1011        handle.set_state(NodeState::Stopped);
1012        assert_eq!(handle.state(), NodeState::Stopped);
1013        assert!(!handle.is_running());
1014    }
1015
1016    #[rstest]
1017    fn test_handle_clone_shares_state_bidirectionally() {
1018        let handle1 = LiveNodeHandle::new();
1019        let handle2 = handle1.clone();
1020
1021        // Mutation from handle1 visible in handle2
1022        handle1.stop();
1023        assert!(handle2.should_stop());
1024
1025        // Mutation from handle2 visible in handle1
1026        handle2.set_state(NodeState::Running);
1027        assert_eq!(handle1.state(), NodeState::Running);
1028    }
1029
1030    #[rstest]
1031    fn test_handle_stop_flag_independent_of_state() {
1032        let handle = LiveNodeHandle::new();
1033
1034        // Stop flag can be set regardless of state
1035        handle.set_state(NodeState::Starting);
1036        handle.stop();
1037        assert!(handle.should_stop());
1038        assert_eq!(handle.state(), NodeState::Starting);
1039
1040        // Only Running state clears the stop flag
1041        handle.set_state(NodeState::ShuttingDown);
1042        assert!(handle.should_stop()); // Still set
1043
1044        handle.set_state(NodeState::Running);
1045        assert!(!handle.should_stop()); // Cleared
1046    }
1047
1048    #[rstest]
1049    fn test_builder_creation() {
1050        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1051
1052        assert!(result.is_ok());
1053    }
1054
1055    #[rstest]
1056    fn test_builder_rejects_backtest() {
1057        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
1058
1059        assert!(result.is_err());
1060        assert!(result.unwrap_err().to_string().contains("Backtest"));
1061    }
1062
1063    #[rstest]
1064    fn test_builder_accepts_live_environment() {
1065        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
1066
1067        assert!(result.is_ok());
1068    }
1069
1070    #[rstest]
1071    fn test_builder_accepts_sandbox_environment() {
1072        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1073
1074        assert!(result.is_ok());
1075    }
1076
1077    #[rstest]
1078    fn test_builder_fluent_api_chaining() {
1079        let builder = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live)
1080            .unwrap()
1081            .with_name("TestNode")
1082            .with_instance_id(UUID4::new())
1083            .with_load_state(false)
1084            .with_save_state(true)
1085            .with_timeout_connection(30)
1086            .with_timeout_reconciliation(60)
1087            .with_reconciliation(true)
1088            .with_reconciliation_lookback_mins(120)
1089            .with_timeout_portfolio(10)
1090            .with_timeout_disconnection_secs(5)
1091            .with_delay_post_stop_secs(3)
1092            .with_delay_shutdown_secs(10);
1093
1094        assert_eq!(builder.name(), "TestNode");
1095    }
1096
1097    #[cfg(feature = "python")]
1098    #[rstest]
1099    fn test_node_build_and_initial_state() {
1100        let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1101            .unwrap()
1102            .with_name("TestNode")
1103            .build()
1104            .unwrap();
1105
1106        assert_eq!(node.state(), NodeState::Idle);
1107        assert!(!node.is_running());
1108        assert_eq!(node.environment(), Environment::Sandbox);
1109        assert_eq!(node.trader_id(), TraderId::from("TRADER-001"));
1110    }
1111
1112    #[cfg(feature = "python")]
1113    #[rstest]
1114    fn test_node_handle_reflects_node_state() {
1115        let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1116            .unwrap()
1117            .with_name("TestNode")
1118            .build()
1119            .unwrap();
1120
1121        let handle = node.handle();
1122
1123        assert_eq!(handle.state(), NodeState::Idle);
1124        assert!(!handle.is_running());
1125    }
1126}