nautilus_live/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    fmt::Debug,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, AtomicU8, Ordering},
21    },
22    time::{Duration, Instant},
23};
24
25use nautilus_common::{
26    actor::{Actor, DataActor},
27    component::Component,
28    enums::{Environment, LogColor},
29    log_info,
30    messages::{DataEvent, ExecutionEvent, data::DataCommand, execution::TradingCommand},
31    timer::TimeEventHandlerV2,
32};
33use nautilus_core::UUID4;
34use nautilus_model::{
35    events::OrderEventAny,
36    identifiers::{StrategyId, TraderId},
37};
38use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
39use nautilus_trading::strategy::Strategy;
40
41use crate::{
42    builder::LiveNodeBuilder,
43    config::LiveNodeConfig,
44    manager::{ExecutionManager, ExecutionManagerConfig},
45    runner::{AsyncRunner, AsyncRunnerChannels},
46};
47
48/// Lifecycle state of the `LiveNode` runner.
49#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
50#[repr(u8)]
51pub enum NodeState {
52    #[default]
53    Idle = 0,
54    Starting = 1,
55    Running = 2,
56    ShuttingDown = 3,
57    Stopped = 4,
58}
59
60impl NodeState {
61    /// Creates a `NodeState` from its `u8` representation.
62    ///
63    /// # Panics
64    ///
65    /// Panics if the value is not a valid `NodeState` discriminant (0-4).
66    #[must_use]
67    pub const fn from_u8(value: u8) -> Self {
68        match value {
69            0 => Self::Idle,
70            1 => Self::Starting,
71            2 => Self::Running,
72            3 => Self::ShuttingDown,
73            4 => Self::Stopped,
74            _ => panic!("Invalid NodeState value"),
75        }
76    }
77
78    /// Returns the `u8` representation of this state.
79    #[must_use]
80    pub const fn as_u8(self) -> u8 {
81        self as u8
82    }
83
84    /// Returns whether the state is `Running`.
85    #[must_use]
86    pub const fn is_running(&self) -> bool {
87        matches!(self, Self::Running)
88    }
89}
90
91/// A thread-safe handle to control a `LiveNode` from other threads.
92///
93/// This allows stopping and querying the node's state without requiring the
94/// node itself to be Send + Sync.
95#[derive(Clone, Debug)]
96pub struct LiveNodeHandle {
97    /// Atomic flag indicating if the node should stop.
98    pub(crate) stop_flag: Arc<AtomicBool>,
99    /// Atomic state as `NodeState::as_u8()`.
100    pub(crate) state: Arc<AtomicU8>,
101}
102
103impl Default for LiveNodeHandle {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109impl LiveNodeHandle {
110    /// Creates a new handle with default (`Idle`) state.
111    #[must_use]
112    pub fn new() -> Self {
113        Self {
114            stop_flag: Arc::new(AtomicBool::new(false)),
115            state: Arc::new(AtomicU8::new(NodeState::Idle.as_u8())),
116        }
117    }
118
119    /// Sets the node state (internal use).
120    pub(crate) fn set_state(&self, state: NodeState) {
121        self.state.store(state.as_u8(), Ordering::Relaxed);
122        if state == NodeState::Running {
123            // Clear stop flag when entering running state
124            self.stop_flag.store(false, Ordering::Relaxed);
125        }
126    }
127
128    /// Returns the current node state.
129    #[must_use]
130    pub fn state(&self) -> NodeState {
131        NodeState::from_u8(self.state.load(Ordering::Relaxed))
132    }
133
134    /// Returns whether the node should stop.
135    #[must_use]
136    pub fn should_stop(&self) -> bool {
137        self.stop_flag.load(Ordering::Relaxed)
138    }
139
140    /// Returns whether the node is currently running.
141    #[must_use]
142    pub fn is_running(&self) -> bool {
143        self.state().is_running()
144    }
145
146    /// Signals the node to stop.
147    pub fn stop(&self) {
148        self.stop_flag.store(true, Ordering::Relaxed);
149    }
150}
151
152/// High-level abstraction for a live Nautilus system node.
153///
154/// Provides a simplified interface for running live systems
155/// with automatic client management and lifecycle handling.
156#[derive(Debug)]
157#[cfg_attr(
158    feature = "python",
159    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
160)]
161pub struct LiveNode {
162    kernel: NautilusKernel,
163    runner: Option<AsyncRunner>,
164    config: LiveNodeConfig,
165    handle: LiveNodeHandle,
166    exec_manager: ExecutionManager,
167    shutdown_deadline: Option<tokio::time::Instant>,
168    #[cfg(feature = "python")]
169    #[allow(dead_code)] // TODO: Under development
170    python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
171}
172
173impl LiveNode {
174    /// Creates a new `LiveNode` from builder components.
175    ///
176    /// This is an internal constructor used by `LiveNodeBuilder`.
177    #[must_use]
178    pub(crate) fn new_from_builder(
179        kernel: NautilusKernel,
180        runner: AsyncRunner,
181        config: LiveNodeConfig,
182        exec_manager: ExecutionManager,
183    ) -> Self {
184        Self {
185            kernel,
186            runner: Some(runner),
187            config,
188            handle: LiveNodeHandle::new(),
189            exec_manager,
190            shutdown_deadline: None,
191            #[cfg(feature = "python")]
192            python_actors: Vec::new(),
193        }
194    }
195
196    /// Creates a new [`LiveNodeBuilder`] for fluent configuration.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the environment is invalid for live trading.
201    pub fn builder(
202        trader_id: TraderId,
203        environment: Environment,
204    ) -> anyhow::Result<LiveNodeBuilder> {
205        LiveNodeBuilder::new(trader_id, environment)
206    }
207
208    /// Creates a new [`LiveNode`] directly from a kernel name and optional configuration.
209    ///
210    /// This is a convenience method for creating a live node with a pre-configured
211    /// kernel configuration, bypassing the builder pattern. If no config is provided,
212    /// a default configuration will be used.
213    ///
214    /// # Errors
215    ///
216    /// Returns an error if kernel construction fails.
217    pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
218        let mut config = config.unwrap_or_default();
219        config.environment = Environment::Live;
220
221        match config.environment() {
222            Environment::Sandbox | Environment::Live => {}
223            Environment::Backtest => {
224                anyhow::bail!("LiveNode cannot be used with Backtest environment");
225            }
226        }
227
228        let runner = AsyncRunner::new();
229        let kernel = NautilusKernel::new(name, config.clone())?;
230
231        let exec_manager_config =
232            ExecutionManagerConfig::from(&config.exec_engine).with_trader_id(config.trader_id);
233        let exec_manager = ExecutionManager::new(
234            kernel.clock.clone(),
235            kernel.cache.clone(),
236            exec_manager_config,
237        );
238
239        log::info!("LiveNode built successfully with kernel config");
240
241        Ok(Self {
242            kernel,
243            runner: Some(runner),
244            config,
245            handle: LiveNodeHandle::new(),
246            exec_manager,
247            shutdown_deadline: None,
248            #[cfg(feature = "python")]
249            python_actors: Vec::new(),
250        })
251    }
252
253    /// Returns a thread-safe handle to control this node.
254    #[must_use]
255    pub fn handle(&self) -> LiveNodeHandle {
256        self.handle.clone()
257    }
258
259    /// Starts the live node.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if startup fails.
264    pub async fn start(&mut self) -> anyhow::Result<()> {
265        if self.state().is_running() {
266            anyhow::bail!("Already running");
267        }
268
269        self.handle.set_state(NodeState::Starting);
270
271        self.kernel.start_async().await;
272        self.kernel.connect_clients().await?;
273        self.await_engines_connected().await?;
274        self.perform_startup_reconciliation().await?;
275
276        self.handle.set_state(NodeState::Running);
277
278        Ok(())
279    }
280
281    /// Stop the live node.
282    ///
283    /// This method stops the trader, waits for the configured grace period to allow
284    /// residual events to be processed, then finalizes the shutdown sequence.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if shutdown fails.
289    pub async fn stop(&mut self) -> anyhow::Result<()> {
290        if !self.state().is_running() {
291            anyhow::bail!("Not running");
292        }
293
294        self.handle.set_state(NodeState::ShuttingDown);
295
296        self.kernel.stop_trader();
297        let delay = self.kernel.delay_post_stop();
298        log::info!("Awaiting residual events ({delay:?})...");
299
300        tokio::time::sleep(delay).await;
301        self.finalize_stop().await
302    }
303
304    /// Awaits engine clients to connect with timeout.
305    async fn await_engines_connected(&self) -> anyhow::Result<()> {
306        let start = Instant::now();
307        let timeout = self.config.timeout_connection;
308        let interval = Duration::from_millis(100);
309
310        while start.elapsed() < timeout {
311            if self.kernel.check_engines_connected() {
312                log::info!("All engine clients connected");
313                return Ok(());
314            }
315            tokio::time::sleep(interval).await;
316        }
317
318        anyhow::bail!("Timeout waiting for engine clients to connect after {timeout:?}")
319    }
320
321    /// Awaits engine clients to disconnect with timeout.
322    async fn await_engines_disconnected(&self) -> anyhow::Result<()> {
323        let start = Instant::now();
324        let timeout = self.config.timeout_disconnection;
325        let interval = Duration::from_millis(100);
326
327        while start.elapsed() < timeout {
328            if self.kernel.check_engines_disconnected() {
329                log::info!("All engine clients disconnected");
330                return Ok(());
331            }
332            tokio::time::sleep(interval).await;
333        }
334
335        anyhow::bail!("Timeout waiting for engine clients to disconnect after {timeout:?}")
336    }
337
338    /// Performs startup reconciliation to align internal state with venue state.
339    ///
340    /// This method queries each execution client for mass status (orders, fills, positions)
341    /// and reconciles any discrepancies with the local cache state.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if reconciliation fails or times out.
346    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
347    async fn perform_startup_reconciliation(&mut self) -> anyhow::Result<()> {
348        if !self.config.exec_engine.reconciliation {
349            log::info!("Startup reconciliation disabled");
350            return Ok(());
351        }
352
353        log_info!(
354            "Starting execution state reconciliation...",
355            color = LogColor::Blue
356        );
357
358        let lookback_mins = self
359            .config
360            .exec_engine
361            .reconciliation_lookback_mins
362            .map(|m| m as u64);
363
364        let timeout = self.config.timeout_reconciliation;
365        let start = Instant::now();
366        let client_ids = self.kernel.exec_engine.borrow().client_ids();
367
368        for client_id in client_ids {
369            if start.elapsed() > timeout {
370                log::warn!("Reconciliation timeout reached, stopping early");
371                break;
372            }
373
374            log_info!(
375                "Requesting mass status from {}...",
376                client_id,
377                color = LogColor::Blue
378            );
379
380            let mass_status_result = self
381                .kernel
382                .exec_engine
383                .borrow_mut()
384                .generate_mass_status(&client_id, lookback_mins)
385                .await;
386
387            match mass_status_result {
388                Ok(Some(mass_status)) => {
389                    log_info!(
390                        "Reconciling ExecutionMassStatus for {}",
391                        client_id,
392                        color = LogColor::Blue
393                    );
394                    let events = self
395                        .exec_manager
396                        .reconcile_execution_mass_status(mass_status)
397                        .await;
398
399                    if events.is_empty() {
400                        log_info!(
401                            "Reconciliation for {} succeeded",
402                            client_id,
403                            color = LogColor::Blue
404                        );
405                    } else {
406                        log::info!(
407                            color = LogColor::Blue as u8;
408                            "Reconciliation for {} generated {} events",
409                            client_id,
410                            events.len()
411                        );
412
413                        let mut exec_engine = self.kernel.exec_engine.borrow_mut();
414                        for event in events {
415                            exec_engine.process(&event);
416                        }
417                    }
418                }
419                Ok(None) => {
420                    log::warn!(
421                        "No mass status available from {client_id} \
422                         (likely adapter error when generating reports)"
423                    );
424                }
425                Err(e) => {
426                    log::warn!("Failed to get mass status from {client_id}: {e}");
427                }
428            }
429        }
430
431        self.kernel.portfolio.borrow_mut().initialize_orders();
432        self.kernel.portfolio.borrow_mut().initialize_positions();
433
434        let elapsed_secs = start.elapsed().as_secs_f64();
435        log_info!(
436            "Startup reconciliation completed in {:.2}s",
437            elapsed_secs,
438            color = LogColor::Blue
439        );
440
441        Ok(())
442    }
443
444    /// Run the live node with automatic shutdown handling.
445    ///
446    /// This method starts the node, runs indefinitely, and handles graceful shutdown
447    /// on interrupt signals.
448    ///
449    /// # Thread Safety
450    ///
451    /// The event loop runs directly on the current thread (not spawned) because the
452    /// msgbus uses thread-local storage. Endpoints registered by the kernel are only
453    /// accessible from the same thread.
454    ///
455    /// # Shutdown Sequence
456    ///
457    /// 1. Signal received (SIGINT or handle stop).
458    /// 2. Trader components stopped (triggers order cancellations, etc.).
459    /// 3. Event loop continues processing residual events for the configured grace period.
460    /// 4. Kernel finalized, clients disconnected, remaining events drained.
461    ///
462    /// # Errors
463    ///
464    /// Returns an error if the node fails to start or encounters a runtime error.
465    pub async fn run(&mut self) -> anyhow::Result<()> {
466        if self.state().is_running() {
467            anyhow::bail!("Already running");
468        }
469
470        let Some(runner) = self.runner.take() else {
471            anyhow::bail!("Runner already consumed - run() called twice");
472        };
473
474        let AsyncRunnerChannels {
475            mut time_evt_rx,
476            mut data_evt_rx,
477            mut data_cmd_rx,
478            mut exec_evt_rx,
479            mut exec_cmd_rx,
480        } = runner.take_channels();
481
482        log::info!("Event loop starting");
483
484        self.handle.set_state(NodeState::Starting);
485        self.kernel.start_async().await;
486
487        let stop_handle = self.handle.clone();
488        let mut pending = PendingEvents::default();
489
490        // Startup phase: process events while completing startup
491        // TODO: Add ctrl_c and stop_handle monitoring here to allow aborting a
492        // hanging startup. Currently signals during startup are ignored, and
493        // any pending stop_flag is cleared when transitioning to Running.
494        {
495            let startup_future = self.complete_startup();
496            tokio::pin!(startup_future);
497
498            loop {
499                tokio::select! {
500                    biased;
501
502                    result = &mut startup_future => {
503                        result?;
504                        // self.state is set to Running by complete_startup()
505                        break;
506                    }
507                    Some(handler) = time_evt_rx.recv() => {
508                        AsyncRunner::handle_time_event(handler);
509                    }
510                    Some(evt) = data_evt_rx.recv() => {
511                        pending.data_evts.push(evt);
512                    }
513                    Some(cmd) = data_cmd_rx.recv() => {
514                        pending.data_cmds.push(cmd);
515                    }
516                    Some(evt) = exec_evt_rx.recv() => {
517                        // Account and Report events are safe, order events conflict
518                        match evt {
519                            ExecutionEvent::Account(_) | ExecutionEvent::Report(_) => {
520                                AsyncRunner::handle_exec_event(evt);
521                            }
522                            ExecutionEvent::Order(order_evt) => {
523                                pending.order_evts.push(order_evt);
524                            }
525                        }
526                    }
527                    Some(cmd) = exec_cmd_rx.recv() => {
528                        pending.exec_cmds.push(cmd);
529                    }
530                }
531            }
532        }
533
534        pending.drain();
535
536        // Running phase: runs until shutdown deadline expires
537        let mut residual_events = 0usize;
538
539        loop {
540            let shutdown_deadline = self.shutdown_deadline;
541            let is_shutting_down = self.state() == NodeState::ShuttingDown;
542
543            tokio::select! {
544                Some(handler) = time_evt_rx.recv() => {
545                    AsyncRunner::handle_time_event(handler);
546                    if is_shutting_down {
547                        log::debug!("Residual time event");
548                        residual_events += 1;
549                    }
550                }
551                Some(evt) = data_evt_rx.recv() => {
552                    if is_shutting_down {
553                        log::debug!("Residual data event: {evt:?}");
554                        residual_events += 1;
555                    }
556                    AsyncRunner::handle_data_event(evt);
557                }
558                Some(cmd) = data_cmd_rx.recv() => {
559                    if is_shutting_down {
560                        log::debug!("Residual data command: {cmd:?}");
561                        residual_events += 1;
562                    }
563                    AsyncRunner::handle_data_command(cmd);
564                }
565                Some(evt) = exec_evt_rx.recv() => {
566                    if is_shutting_down {
567                        log::debug!("Residual exec event: {evt:?}");
568                        residual_events += 1;
569                    }
570                    AsyncRunner::handle_exec_event(evt);
571                }
572                Some(cmd) = exec_cmd_rx.recv() => {
573                    if is_shutting_down {
574                        log::debug!("Residual exec command: {cmd:?}");
575                        residual_events += 1;
576                    }
577                    AsyncRunner::handle_exec_command(cmd);
578                }
579                result = tokio::signal::ctrl_c(), if self.state() == NodeState::Running => {
580                    match result {
581                        Ok(()) => log::info!("Received SIGINT, shutting down"),
582                        Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
583                    }
584                    self.initiate_shutdown();
585                }
586                () = async {
587                    loop {
588                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
589                        if stop_handle.should_stop() {
590                            log::info!("Received stop signal from handle");
591                            return;
592                        }
593                    }
594                }, if self.state() == NodeState::Running => {
595                    self.initiate_shutdown();
596                }
597                () = async {
598                    match shutdown_deadline {
599                        Some(deadline) => tokio::time::sleep_until(deadline).await,
600                        None => std::future::pending::<()>().await,
601                    }
602                }, if self.state() == NodeState::ShuttingDown => {
603                    break;
604                }
605            }
606        }
607
608        if residual_events > 0 {
609            log::debug!("Processed {residual_events} residual events during shutdown");
610        }
611
612        let _ = self.kernel.cache().borrow().check_residuals();
613
614        self.finalize_stop().await?;
615
616        // Handle events that arrived during finalize_stop
617        self.drain_channels(
618            &mut time_evt_rx,
619            &mut data_evt_rx,
620            &mut data_cmd_rx,
621            &mut exec_evt_rx,
622            &mut exec_cmd_rx,
623        );
624
625        log::info!("Event loop stopped");
626
627        Ok(())
628    }
629
630    async fn complete_startup(&mut self) -> anyhow::Result<()> {
631        self.kernel.connect_clients().await?;
632        self.await_engines_connected().await?;
633        self.perform_startup_reconciliation().await?;
634
635        self.handle.set_state(NodeState::Running);
636
637        Ok(())
638    }
639
640    fn initiate_shutdown(&mut self) {
641        self.kernel.stop_trader();
642        let delay = self.kernel.delay_post_stop();
643        log::info!("Awaiting residual events ({delay:?})...");
644
645        self.shutdown_deadline = Some(tokio::time::Instant::now() + delay);
646        self.handle.set_state(NodeState::ShuttingDown);
647    }
648
649    async fn finalize_stop(&mut self) -> anyhow::Result<()> {
650        self.kernel.disconnect_clients().await?;
651        self.await_engines_disconnected().await?;
652        self.kernel.finalize_stop().await;
653
654        self.handle.set_state(NodeState::Stopped);
655
656        Ok(())
657    }
658
659    fn drain_channels(
660        &self,
661        time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
662        data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
663        data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
664        exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
665        exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
666    ) {
667        let mut drained = 0;
668
669        while let Ok(handler) = time_evt_rx.try_recv() {
670            AsyncRunner::handle_time_event(handler);
671            drained += 1;
672        }
673        while let Ok(cmd) = data_cmd_rx.try_recv() {
674            AsyncRunner::handle_data_command(cmd);
675            drained += 1;
676        }
677        while let Ok(evt) = data_evt_rx.try_recv() {
678            AsyncRunner::handle_data_event(evt);
679            drained += 1;
680        }
681        while let Ok(cmd) = exec_cmd_rx.try_recv() {
682            AsyncRunner::handle_exec_command(cmd);
683            drained += 1;
684        }
685        while let Ok(evt) = exec_evt_rx.try_recv() {
686            AsyncRunner::handle_exec_event(evt);
687            drained += 1;
688        }
689
690        if drained > 0 {
691            log::info!("Drained {drained} remaining events during shutdown");
692        }
693    }
694
695    /// Gets the node's environment.
696    #[must_use]
697    pub fn environment(&self) -> Environment {
698        self.kernel.environment()
699    }
700
701    /// Gets a reference to the underlying kernel.
702    #[must_use]
703    pub const fn kernel(&self) -> &NautilusKernel {
704        &self.kernel
705    }
706
707    /// Gets an exclusive reference to the underlying kernel.
708    #[must_use]
709    pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
710        &mut self.kernel
711    }
712
713    /// Gets the node's trader ID.
714    #[must_use]
715    pub fn trader_id(&self) -> TraderId {
716        self.kernel.trader_id()
717    }
718
719    /// Gets the node's instance ID.
720    #[must_use]
721    pub const fn instance_id(&self) -> UUID4 {
722        self.kernel.instance_id()
723    }
724
725    /// Returns the current node state.
726    #[must_use]
727    pub fn state(&self) -> NodeState {
728        self.handle.state()
729    }
730
731    /// Checks if the live node is currently running.
732    #[must_use]
733    pub fn is_running(&self) -> bool {
734        self.state().is_running()
735    }
736
737    /// Gets a reference to the execution manager.
738    #[must_use]
739    pub const fn exec_manager(&self) -> &ExecutionManager {
740        &self.exec_manager
741    }
742
743    /// Gets an exclusive reference to the execution manager.
744    #[must_use]
745    pub fn exec_manager_mut(&mut self) -> &mut ExecutionManager {
746        &mut self.exec_manager
747    }
748
749    /// Adds an actor to the trader.
750    ///
751    /// This method provides a high-level interface for adding actors to the underlying
752    /// trader without requiring direct access to the kernel. Actors should be added
753    /// after the node is built but before starting the node.
754    ///
755    /// # Errors
756    ///
757    /// Returns an error if:
758    /// - The trader is not in a valid state for adding components.
759    /// - An actor with the same ID is already registered.
760    /// - The node is currently running.
761    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
762    where
763        T: DataActor + Component + Actor + 'static,
764    {
765        if self.state() != NodeState::Idle {
766            anyhow::bail!(
767                "Cannot add actor while node is running, add actors before calling start()"
768            );
769        }
770
771        self.kernel.trader.add_actor(actor)
772    }
773
774    /// Adds an actor to the live node using a factory function.
775    ///
776    /// The factory function is called at registration time to create the actor,
777    /// avoiding cloning issues with non-cloneable actor types.
778    ///
779    /// # Errors
780    ///
781    /// Returns an error if:
782    /// - The node is currently running.
783    /// - The factory function fails to create the actor.
784    /// - The underlying trader registration fails.
785    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
786    where
787        F: FnOnce() -> anyhow::Result<T>,
788        T: DataActor + Component + Actor + 'static,
789    {
790        if self.state() != NodeState::Idle {
791            anyhow::bail!(
792                "Cannot add actor while node is running, add actors before calling start()"
793            );
794        }
795
796        self.kernel.trader.add_actor_from_factory(factory)
797    }
798
799    /// Adds a strategy to the trader.
800    ///
801    /// Strategies are registered in both the component registry (for lifecycle management)
802    /// and the actor registry (for data callbacks via msgbus).
803    ///
804    /// # Errors
805    ///
806    /// Returns an error if:
807    /// - The node is currently running.
808    /// - A strategy with the same ID is already registered.
809    pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
810    where
811        T: Strategy + Component + Debug + 'static,
812    {
813        if self.state() != NodeState::Idle {
814            anyhow::bail!(
815                "Cannot add strategy while node is running, add strategies before calling start()"
816            );
817        }
818
819        // Register external order claims before adding strategy (which moves it)
820        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
821        if let Some(claims) = strategy.external_order_claims() {
822            for instrument_id in claims {
823                self.exec_manager
824                    .claim_external_orders(instrument_id, strategy_id);
825            }
826            log_info!(
827                "Registered external order claims for {}: {:?}",
828                strategy_id,
829                strategy.external_order_claims(),
830                color = LogColor::Blue
831            );
832        }
833
834        self.kernel.trader.add_strategy(strategy)
835    }
836}
837
838/// Events queued during startup to avoid RefCell borrow conflicts.
839///
840/// During `connect_clients()`, the data_engine and exec_engine are borrowed
841/// across awaits. Processing commands/events that trigger msgbus handlers
842/// would try to borrow the same engines, causing a panic.
843#[derive(Default)]
844struct PendingEvents {
845    data_cmds: Vec<DataCommand>,
846    data_evts: Vec<DataEvent>,
847    exec_cmds: Vec<TradingCommand>,
848    order_evts: Vec<OrderEventAny>,
849}
850
851impl PendingEvents {
852    fn drain(&mut self) {
853        let total = self.data_evts.len()
854            + self.data_cmds.len()
855            + self.exec_cmds.len()
856            + self.order_evts.len();
857
858        if total > 0 {
859            log::debug!(
860                "Processing {total} events/commands queued during startup \
861                 (data_evts={}, data_cmds={}, exec_cmds={}, order_evts={})",
862                self.data_evts.len(),
863                self.data_cmds.len(),
864                self.exec_cmds.len(),
865                self.order_evts.len()
866            );
867        }
868
869        for evt in self.data_evts.drain(..) {
870            AsyncRunner::handle_data_event(evt);
871        }
872        for cmd in self.data_cmds.drain(..) {
873            AsyncRunner::handle_data_command(cmd);
874        }
875        for cmd in self.exec_cmds.drain(..) {
876            AsyncRunner::handle_exec_command(cmd);
877        }
878        for evt in self.order_evts.drain(..) {
879            AsyncRunner::handle_exec_event(ExecutionEvent::Order(evt));
880        }
881    }
882}
883
884#[cfg(test)]
885mod tests {
886    use nautilus_model::identifiers::TraderId;
887    use rstest::*;
888
889    use super::*;
890
891    #[rstest]
892    #[case(0, NodeState::Idle)]
893    #[case(1, NodeState::Starting)]
894    #[case(2, NodeState::Running)]
895    #[case(3, NodeState::ShuttingDown)]
896    #[case(4, NodeState::Stopped)]
897    fn test_node_state_from_u8_valid(#[case] value: u8, #[case] expected: NodeState) {
898        assert_eq!(NodeState::from_u8(value), expected);
899    }
900
901    #[rstest]
902    #[case(5)]
903    #[case(255)]
904    #[should_panic(expected = "Invalid NodeState value")]
905    fn test_node_state_from_u8_invalid_panics(#[case] value: u8) {
906        let _ = NodeState::from_u8(value);
907    }
908
909    #[rstest]
910    fn test_node_state_roundtrip() {
911        for state in [
912            NodeState::Idle,
913            NodeState::Starting,
914            NodeState::Running,
915            NodeState::ShuttingDown,
916            NodeState::Stopped,
917        ] {
918            assert_eq!(NodeState::from_u8(state.as_u8()), state);
919        }
920    }
921
922    #[rstest]
923    fn test_node_state_is_running_only_for_running() {
924        assert!(!NodeState::Idle.is_running());
925        assert!(!NodeState::Starting.is_running());
926        assert!(NodeState::Running.is_running());
927        assert!(!NodeState::ShuttingDown.is_running());
928        assert!(!NodeState::Stopped.is_running());
929    }
930
931    #[rstest]
932    fn test_handle_initial_state() {
933        let handle = LiveNodeHandle::new();
934
935        assert_eq!(handle.state(), NodeState::Idle);
936        assert!(!handle.should_stop());
937        assert!(!handle.is_running());
938    }
939
940    #[rstest]
941    fn test_handle_stop_sets_flag() {
942        let handle = LiveNodeHandle::new();
943
944        handle.stop();
945
946        assert!(handle.should_stop());
947    }
948
949    #[rstest]
950    fn test_handle_set_state_running_clears_stop_flag() {
951        let handle = LiveNodeHandle::new();
952        handle.stop();
953        assert!(handle.should_stop());
954
955        handle.set_state(NodeState::Running);
956
957        assert!(!handle.should_stop());
958        assert!(handle.is_running());
959        assert_eq!(handle.state(), NodeState::Running);
960    }
961
962    #[rstest]
963    fn test_handle_node_state_transitions() {
964        let handle = LiveNodeHandle::new();
965        assert_eq!(handle.state(), NodeState::Idle);
966
967        handle.set_state(NodeState::Starting);
968        assert_eq!(handle.state(), NodeState::Starting);
969        assert!(!handle.is_running());
970
971        handle.set_state(NodeState::Running);
972        assert_eq!(handle.state(), NodeState::Running);
973        assert!(handle.is_running());
974
975        handle.set_state(NodeState::ShuttingDown);
976        assert_eq!(handle.state(), NodeState::ShuttingDown);
977        assert!(!handle.is_running());
978
979        handle.set_state(NodeState::Stopped);
980        assert_eq!(handle.state(), NodeState::Stopped);
981        assert!(!handle.is_running());
982    }
983
984    #[rstest]
985    fn test_handle_clone_shares_state_bidirectionally() {
986        let handle1 = LiveNodeHandle::new();
987        let handle2 = handle1.clone();
988
989        // Mutation from handle1 visible in handle2
990        handle1.stop();
991        assert!(handle2.should_stop());
992
993        // Mutation from handle2 visible in handle1
994        handle2.set_state(NodeState::Running);
995        assert_eq!(handle1.state(), NodeState::Running);
996    }
997
998    #[rstest]
999    fn test_handle_stop_flag_independent_of_state() {
1000        let handle = LiveNodeHandle::new();
1001
1002        // Stop flag can be set regardless of state
1003        handle.set_state(NodeState::Starting);
1004        handle.stop();
1005        assert!(handle.should_stop());
1006        assert_eq!(handle.state(), NodeState::Starting);
1007
1008        // Only Running state clears the stop flag
1009        handle.set_state(NodeState::ShuttingDown);
1010        assert!(handle.should_stop()); // Still set
1011
1012        handle.set_state(NodeState::Running);
1013        assert!(!handle.should_stop()); // Cleared
1014    }
1015
1016    #[rstest]
1017    fn test_builder_creation() {
1018        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1019
1020        assert!(result.is_ok());
1021    }
1022
1023    #[rstest]
1024    fn test_builder_rejects_backtest() {
1025        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
1026
1027        assert!(result.is_err());
1028        assert!(result.unwrap_err().to_string().contains("Backtest"));
1029    }
1030
1031    #[rstest]
1032    fn test_builder_accepts_live_environment() {
1033        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
1034
1035        assert!(result.is_ok());
1036    }
1037
1038    #[rstest]
1039    fn test_builder_accepts_sandbox_environment() {
1040        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1041
1042        assert!(result.is_ok());
1043    }
1044
1045    #[rstest]
1046    fn test_builder_fluent_api_chaining() {
1047        let builder = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live)
1048            .unwrap()
1049            .with_name("TestNode")
1050            .with_instance_id(UUID4::new())
1051            .with_load_state(false)
1052            .with_save_state(true)
1053            .with_timeout_connection(30)
1054            .with_timeout_reconciliation(60)
1055            .with_reconciliation(true)
1056            .with_reconciliation_lookback_mins(120)
1057            .with_timeout_portfolio(10)
1058            .with_timeout_disconnection_secs(5)
1059            .with_delay_post_stop_secs(3)
1060            .with_delay_shutdown_secs(10);
1061
1062        assert_eq!(builder.name(), "TestNode");
1063    }
1064
1065    #[cfg(feature = "python")]
1066    #[rstest]
1067    fn test_node_build_and_initial_state() {
1068        let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1069            .unwrap()
1070            .with_name("TestNode")
1071            .build()
1072            .unwrap();
1073
1074        assert_eq!(node.state(), NodeState::Idle);
1075        assert!(!node.is_running());
1076        assert_eq!(node.environment(), Environment::Sandbox);
1077        assert_eq!(node.trader_id(), TraderId::from("TRADER-001"));
1078    }
1079
1080    #[cfg(feature = "python")]
1081    #[rstest]
1082    fn test_node_handle_reflects_node_state() {
1083        let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1084            .unwrap()
1085            .with_name("TestNode")
1086            .build()
1087            .unwrap();
1088
1089        let handle = node.handle();
1090
1091        assert_eq!(handle.state(), NodeState::Idle);
1092        assert!(!handle.is_running());
1093    }
1094}