Skip to main content

nautilus_live/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    fmt::Debug,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, AtomicU8, Ordering},
21    },
22    time::{Duration, Instant},
23};
24
25use nautilus_common::{
26    actor::{Actor, DataActor},
27    cache::database::CacheDatabaseAdapter,
28    component::Component,
29    enums::{Environment, LogColor},
30    log_info,
31    messages::{DataEvent, ExecutionEvent, data::DataCommand, execution::TradingCommand},
32    timer::TimeEventHandler,
33};
34use nautilus_core::UUID4;
35use nautilus_model::{
36    events::OrderEventAny,
37    identifiers::{StrategyId, TraderId},
38};
39use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
40use nautilus_trading::strategy::Strategy;
41use tabled::{Table, Tabled, settings::Style};
42
43use crate::{
44    builder::LiveNodeBuilder,
45    config::LiveNodeConfig,
46    manager::{ExecutionManager, ExecutionManagerConfig},
47    runner::{AsyncRunner, AsyncRunnerChannels},
48};
49
50/// Lifecycle state of the `LiveNode` runner.
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52#[repr(u8)]
53pub enum NodeState {
54    #[default]
55    Idle = 0,
56    Starting = 1,
57    Running = 2,
58    ShuttingDown = 3,
59    Stopped = 4,
60}
61
62impl NodeState {
63    /// Creates a `NodeState` from its `u8` representation.
64    ///
65    /// # Panics
66    ///
67    /// Panics if the value is not a valid `NodeState` discriminant (0-4).
68    #[must_use]
69    pub const fn from_u8(value: u8) -> Self {
70        match value {
71            0 => Self::Idle,
72            1 => Self::Starting,
73            2 => Self::Running,
74            3 => Self::ShuttingDown,
75            4 => Self::Stopped,
76            _ => panic!("Invalid NodeState value"),
77        }
78    }
79
80    /// Returns the `u8` representation of this state.
81    #[must_use]
82    pub const fn as_u8(self) -> u8 {
83        self as u8
84    }
85
86    /// Returns whether the state is `Running`.
87    #[must_use]
88    pub const fn is_running(&self) -> bool {
89        matches!(self, Self::Running)
90    }
91}
92
93/// A thread-safe handle to control a `LiveNode` from other threads.
94///
95/// This allows stopping and querying the node's state without requiring the
96/// node itself to be Send + Sync.
97#[derive(Clone, Debug)]
98pub struct LiveNodeHandle {
99    /// Atomic flag indicating if the node should stop.
100    pub(crate) stop_flag: Arc<AtomicBool>,
101    /// Atomic state as `NodeState::as_u8()`.
102    pub(crate) state: Arc<AtomicU8>,
103}
104
105impl Default for LiveNodeHandle {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl LiveNodeHandle {
112    /// Creates a new handle with default (`Idle`) state.
113    #[must_use]
114    pub fn new() -> Self {
115        Self {
116            stop_flag: Arc::new(AtomicBool::new(false)),
117            state: Arc::new(AtomicU8::new(NodeState::Idle.as_u8())),
118        }
119    }
120
121    /// Sets the node state (internal use).
122    pub(crate) fn set_state(&self, state: NodeState) {
123        self.state.store(state.as_u8(), Ordering::Relaxed);
124        if state == NodeState::Running {
125            // Clear stop flag when entering running state
126            self.stop_flag.store(false, Ordering::Relaxed);
127        }
128    }
129
130    /// Returns the current node state.
131    #[must_use]
132    pub fn state(&self) -> NodeState {
133        NodeState::from_u8(self.state.load(Ordering::Relaxed))
134    }
135
136    /// Returns whether the node should stop.
137    #[must_use]
138    pub fn should_stop(&self) -> bool {
139        self.stop_flag.load(Ordering::Relaxed)
140    }
141
142    /// Returns whether the node is currently running.
143    #[must_use]
144    pub fn is_running(&self) -> bool {
145        self.state().is_running()
146    }
147
148    /// Signals the node to stop.
149    pub fn stop(&self) {
150        self.stop_flag.store(true, Ordering::Relaxed);
151    }
152}
153
154/// High-level abstraction for a live Nautilus system node.
155///
156/// Provides a simplified interface for running live systems
157/// with automatic client management and lifecycle handling.
158#[derive(Debug)]
159#[cfg_attr(
160    feature = "python",
161    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
162)]
163pub struct LiveNode {
164    kernel: NautilusKernel,
165    runner: Option<AsyncRunner>,
166    config: LiveNodeConfig,
167    handle: LiveNodeHandle,
168    exec_manager: ExecutionManager,
169    shutdown_deadline: Option<tokio::time::Instant>,
170    #[cfg(feature = "python")]
171    #[allow(dead_code)] // TODO: Under development
172    python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
173}
174
175impl LiveNode {
176    /// Creates a new `LiveNode` from builder components.
177    ///
178    /// This is an internal constructor used by `LiveNodeBuilder`.
179    #[must_use]
180    pub(crate) fn new_from_builder(
181        kernel: NautilusKernel,
182        runner: AsyncRunner,
183        config: LiveNodeConfig,
184        exec_manager: ExecutionManager,
185    ) -> Self {
186        Self {
187            kernel,
188            runner: Some(runner),
189            config,
190            handle: LiveNodeHandle::new(),
191            exec_manager,
192            shutdown_deadline: None,
193            #[cfg(feature = "python")]
194            python_actors: Vec::new(),
195        }
196    }
197
198    /// Creates a new [`LiveNodeBuilder`] for fluent configuration.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if the environment is invalid for live trading.
203    pub fn builder(
204        trader_id: TraderId,
205        environment: Environment,
206    ) -> anyhow::Result<LiveNodeBuilder> {
207        LiveNodeBuilder::new(trader_id, environment)
208    }
209
210    /// Creates a new [`LiveNode`] directly from a kernel name and optional configuration.
211    ///
212    /// This is a convenience method for creating a live node with a pre-configured
213    /// kernel configuration, bypassing the builder pattern. If no config is provided,
214    /// a default configuration will be used.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if kernel construction fails.
219    pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
220        let mut config = config.unwrap_or_default();
221        config.environment = Environment::Live;
222
223        match config.environment() {
224            Environment::Sandbox | Environment::Live => {}
225            Environment::Backtest => {
226                anyhow::bail!("LiveNode cannot be used with Backtest environment");
227            }
228        }
229
230        let runner = AsyncRunner::new();
231        let kernel = NautilusKernel::new(name, config.clone())?;
232
233        let exec_manager_config =
234            ExecutionManagerConfig::from(&config.exec_engine).with_trader_id(config.trader_id);
235        let exec_manager = ExecutionManager::new(
236            kernel.clock.clone(),
237            kernel.cache.clone(),
238            exec_manager_config,
239        );
240
241        log::info!("LiveNode built successfully with kernel config");
242
243        Ok(Self {
244            kernel,
245            runner: Some(runner),
246            config,
247            handle: LiveNodeHandle::new(),
248            exec_manager,
249            shutdown_deadline: None,
250            #[cfg(feature = "python")]
251            python_actors: Vec::new(),
252        })
253    }
254
255    /// Returns a thread-safe handle to control this node.
256    #[must_use]
257    pub fn handle(&self) -> LiveNodeHandle {
258        self.handle.clone()
259    }
260
261    /// Starts the live node.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if startup fails.
266    pub async fn start(&mut self) -> anyhow::Result<()> {
267        if self.state().is_running() {
268            anyhow::bail!("Already running");
269        }
270
271        self.handle.set_state(NodeState::Starting);
272
273        self.kernel.start_async().await;
274        self.kernel.connect_clients().await;
275
276        if !self.await_engines_connected().await {
277            log::error!("Cannot start trader: engine client(s) not connected");
278            self.handle.set_state(NodeState::Running);
279            return Ok(());
280        }
281
282        // Process pending data events before reconciliation and starting trader
283        if let Some(runner) = self.runner.as_mut() {
284            runner.drain_pending_data_events();
285        }
286
287        self.perform_startup_reconciliation().await?;
288
289        self.kernel.start_trader();
290
291        self.handle.set_state(NodeState::Running);
292
293        Ok(())
294    }
295
296    /// Stop the live node.
297    ///
298    /// This method stops the trader, waits for the configured grace period to allow
299    /// residual events to be processed, then finalizes the shutdown sequence.
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if shutdown fails.
304    pub async fn stop(&mut self) -> anyhow::Result<()> {
305        if !self.state().is_running() {
306            anyhow::bail!("Not running");
307        }
308
309        self.handle.set_state(NodeState::ShuttingDown);
310
311        self.kernel.stop_trader();
312        let delay = self.kernel.delay_post_stop();
313        log::info!("Awaiting residual events ({delay:?})...");
314
315        tokio::time::sleep(delay).await;
316        self.finalize_stop().await
317    }
318
319    /// Awaits engine clients to connect with timeout.
320    ///
321    /// Returns `true` if all engines connected, `false` if timed out.
322    async fn await_engines_connected(&self) -> bool {
323        log::info!(
324            "Awaiting engine connections ({:?} timeout)...",
325            self.config.timeout_connection
326        );
327
328        let start = Instant::now();
329        let timeout = self.config.timeout_connection;
330        let interval = Duration::from_millis(100);
331
332        while start.elapsed() < timeout {
333            if self.kernel.check_engines_connected() {
334                log::info!("All engine clients connected");
335                return true;
336            }
337            tokio::time::sleep(interval).await;
338        }
339
340        self.log_connection_status();
341        false
342    }
343
344    /// Awaits engine clients to disconnect with timeout.
345    ///
346    /// Logs an error with client status on timeout but does not fail.
347    async fn await_engines_disconnected(&self) {
348        log::info!(
349            "Awaiting engine disconnections ({:?} timeout)...",
350            self.config.timeout_disconnection
351        );
352
353        let start = Instant::now();
354        let timeout = self.config.timeout_disconnection;
355        let interval = Duration::from_millis(100);
356
357        while start.elapsed() < timeout {
358            if self.kernel.check_engines_disconnected() {
359                log::info!("All engine clients disconnected");
360                return;
361            }
362            tokio::time::sleep(interval).await;
363        }
364
365        log::error!(
366            "Timed out ({:?}) waiting for engines to disconnect\n\
367             DataEngine.check_disconnected() == {}\n\
368             ExecEngine.check_disconnected() == {}",
369            timeout,
370            self.kernel.data_engine().check_disconnected(),
371            self.kernel.exec_engine().borrow().check_disconnected(),
372        );
373    }
374
375    fn log_connection_status(&self) {
376        #[derive(Tabled)]
377        struct ClientStatus {
378            #[tabled(rename = "Client")]
379            client: String,
380            #[tabled(rename = "Type")]
381            client_type: &'static str,
382            #[tabled(rename = "Connected")]
383            connected: bool,
384        }
385
386        let data_status = self.kernel.data_client_connection_status();
387        let exec_status = self.kernel.exec_client_connection_status();
388
389        let mut rows: Vec<ClientStatus> = Vec::new();
390
391        for (client_id, connected) in data_status {
392            rows.push(ClientStatus {
393                client: client_id.to_string(),
394                client_type: "Data",
395                connected,
396            });
397        }
398
399        for (client_id, connected) in exec_status {
400            rows.push(ClientStatus {
401                client: client_id.to_string(),
402                client_type: "Execution",
403                connected,
404            });
405        }
406
407        let table = Table::new(&rows).with(Style::rounded()).to_string();
408
409        log::warn!(
410            "Timed out ({:?}) waiting for engines to connect\n\n{table}\n\n\
411             DataEngine.check_connected() == {}\n\
412             ExecEngine.check_connected() == {}",
413            self.config.timeout_connection,
414            self.kernel.data_engine().check_connected(),
415            self.kernel.exec_engine().borrow().check_connected(),
416        );
417    }
418
419    /// Performs startup reconciliation to align internal state with venue state.
420    ///
421    /// This method queries each execution client for mass status (orders, fills, positions)
422    /// and reconciles any discrepancies with the local cache state.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error if reconciliation fails or times out.
427    #[allow(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
428    async fn perform_startup_reconciliation(&mut self) -> anyhow::Result<()> {
429        if !self.config.exec_engine.reconciliation {
430            log::info!("Startup reconciliation disabled");
431            return Ok(());
432        }
433
434        log_info!(
435            "Starting execution state reconciliation...",
436            color = LogColor::Blue
437        );
438
439        let lookback_mins = self
440            .config
441            .exec_engine
442            .reconciliation_lookback_mins
443            .map(|m| m as u64);
444
445        let timeout = self.config.timeout_reconciliation;
446        let start = Instant::now();
447        let client_ids = self.kernel.exec_engine.borrow().client_ids();
448
449        for client_id in client_ids {
450            if start.elapsed() > timeout {
451                log::warn!("Reconciliation timeout reached, stopping early");
452                break;
453            }
454
455            log_info!(
456                "Requesting mass status from {}...",
457                client_id,
458                color = LogColor::Blue
459            );
460
461            let mass_status_result = self
462                .kernel
463                .exec_engine
464                .borrow_mut()
465                .generate_mass_status(&client_id, lookback_mins)
466                .await;
467
468            match mass_status_result {
469                Ok(Some(mass_status)) => {
470                    log_info!(
471                        "Reconciling ExecutionMassStatus for {}",
472                        client_id,
473                        color = LogColor::Blue
474                    );
475
476                    // SAFETY: Do not hold the Rc across an await point
477                    let exec_engine_rc = self.kernel.exec_engine.clone();
478
479                    let result = self
480                        .exec_manager
481                        .reconcile_execution_mass_status(mass_status, exec_engine_rc)
482                        .await;
483
484                    if result.events.is_empty() {
485                        log_info!(
486                            "Reconciliation for {} succeeded",
487                            client_id,
488                            color = LogColor::Blue
489                        );
490                    } else {
491                        log::info!(
492                            color = LogColor::Blue as u8;
493                            "Reconciliation for {} processed {} events",
494                            client_id,
495                            result.events.len()
496                        );
497                    }
498
499                    // Register external orders with execution clients for tracking
500                    if !result.external_orders.is_empty() {
501                        let exec_engine = self.kernel.exec_engine.borrow();
502                        for external in result.external_orders {
503                            exec_engine.register_external_order(
504                                external.client_order_id,
505                                external.venue_order_id,
506                                external.instrument_id,
507                                external.strategy_id,
508                                external.ts_init,
509                            );
510                        }
511                    }
512                }
513                Ok(None) => {
514                    log::warn!(
515                        "No mass status available from {client_id} \
516                         (likely adapter error when generating reports)"
517                    );
518                }
519                Err(e) => {
520                    log::warn!("Failed to get mass status from {client_id}: {e}");
521                }
522            }
523        }
524
525        self.kernel.portfolio.borrow_mut().initialize_orders();
526        self.kernel.portfolio.borrow_mut().initialize_positions();
527
528        let elapsed_secs = start.elapsed().as_secs_f64();
529        log_info!(
530            "Startup reconciliation completed in {:.2}s",
531            elapsed_secs,
532            color = LogColor::Blue
533        );
534
535        Ok(())
536    }
537
538    /// Run the live node with automatic shutdown handling.
539    ///
540    /// This method starts the node, runs indefinitely, and handles graceful shutdown
541    /// on interrupt signals.
542    ///
543    /// # Thread Safety
544    ///
545    /// The event loop runs directly on the current thread (not spawned) because the
546    /// msgbus uses thread-local storage. Endpoints registered by the kernel are only
547    /// accessible from the same thread.
548    ///
549    /// # Shutdown Sequence
550    ///
551    /// 1. Signal received (SIGINT or handle stop).
552    /// 2. Trader components stopped (triggers order cancellations, etc.).
553    /// 3. Event loop continues processing residual events for the configured grace period.
554    /// 4. Kernel finalized, clients disconnected, remaining events drained.
555    ///
556    /// # Errors
557    ///
558    /// Returns an error if the node fails to start or encounters a runtime error.
559    pub async fn run(&mut self) -> anyhow::Result<()> {
560        if self.state().is_running() {
561            anyhow::bail!("Already running");
562        }
563
564        let Some(runner) = self.runner.take() else {
565            anyhow::bail!("Runner already consumed - run() called twice");
566        };
567
568        let AsyncRunnerChannels {
569            mut time_evt_rx,
570            mut data_evt_rx,
571            mut data_cmd_rx,
572            mut exec_evt_rx,
573            mut exec_cmd_rx,
574        } = runner.take_channels();
575
576        log::info!("Event loop starting");
577
578        self.handle.set_state(NodeState::Starting);
579        self.kernel.start_async().await;
580
581        let stop_handle = self.handle.clone();
582        let mut pending = PendingEvents::default();
583
584        // Startup phase: process events while completing startup
585        // TODO: Add ctrl_c and stop_handle monitoring here to allow aborting a
586        // hanging startup. Currently signals during startup are ignored, and
587        // any pending stop_flag is cleared when transitioning to Running.
588        let engines_connected = {
589            let startup_future = self.complete_startup();
590            tokio::pin!(startup_future);
591
592            loop {
593                tokio::select! {
594                    biased;
595
596                    result = &mut startup_future => {
597                        break result?;
598                    }
599                    Some(handler) = time_evt_rx.recv() => {
600                        AsyncRunner::handle_time_event(handler);
601                    }
602                    Some(evt) = data_evt_rx.recv() => {
603                        pending.data_evts.push(evt);
604                    }
605                    Some(cmd) = data_cmd_rx.recv() => {
606                        pending.data_cmds.push(cmd);
607                    }
608                    Some(evt) = exec_evt_rx.recv() => {
609                        // Account and Report events are safe, order events conflict
610                        match evt {
611                            ExecutionEvent::Account(_) | ExecutionEvent::Report(_) => {
612                                AsyncRunner::handle_exec_event(evt);
613                            }
614                            ExecutionEvent::Order(order_evt) => {
615                                pending.order_evts.push(order_evt);
616                            }
617                        }
618                    }
619                    Some(cmd) = exec_cmd_rx.recv() => {
620                        pending.exec_cmds.push(cmd);
621                    }
622                }
623            }
624        };
625
626        pending.drain();
627
628        if engines_connected {
629            // Run reconciliation now that instruments are in cache and start trader
630            self.perform_startup_reconciliation().await?;
631            self.kernel.start_trader();
632        } else {
633            log::error!("Not starting trader: engine client(s) not connected");
634        }
635
636        self.handle.set_state(NodeState::Running);
637
638        // Running phase: runs until shutdown deadline expires
639        let mut residual_events = 0usize;
640
641        loop {
642            let shutdown_deadline = self.shutdown_deadline;
643            let is_shutting_down = self.state() == NodeState::ShuttingDown;
644
645            tokio::select! {
646                Some(handler) = time_evt_rx.recv() => {
647                    AsyncRunner::handle_time_event(handler);
648                    if is_shutting_down {
649                        log::debug!("Residual time event");
650                        residual_events += 1;
651                    }
652                }
653                Some(evt) = data_evt_rx.recv() => {
654                    if is_shutting_down {
655                        log::debug!("Residual data event: {evt:?}");
656                        residual_events += 1;
657                    }
658                    AsyncRunner::handle_data_event(evt);
659                }
660                Some(cmd) = data_cmd_rx.recv() => {
661                    if is_shutting_down {
662                        log::debug!("Residual data command: {cmd:?}");
663                        residual_events += 1;
664                    }
665                    AsyncRunner::handle_data_command(cmd);
666                }
667                Some(evt) = exec_evt_rx.recv() => {
668                    if is_shutting_down {
669                        log::debug!("Residual exec event: {evt:?}");
670                        residual_events += 1;
671                    }
672                    AsyncRunner::handle_exec_event(evt);
673                }
674                Some(cmd) = exec_cmd_rx.recv() => {
675                    if is_shutting_down {
676                        log::debug!("Residual exec command: {cmd:?}");
677                        residual_events += 1;
678                    }
679                    AsyncRunner::handle_exec_command(cmd);
680                }
681                result = tokio::signal::ctrl_c(), if self.state() == NodeState::Running => {
682                    match result {
683                        Ok(()) => log::info!("Received SIGINT, shutting down"),
684                        Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
685                    }
686                    self.initiate_shutdown();
687                }
688                () = async {
689                    loop {
690                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
691                        if stop_handle.should_stop() {
692                            log::info!("Received stop signal from handle");
693                            return;
694                        }
695                    }
696                }, if self.state() == NodeState::Running => {
697                    self.initiate_shutdown();
698                }
699                () = async {
700                    match shutdown_deadline {
701                        Some(deadline) => tokio::time::sleep_until(deadline).await,
702                        None => std::future::pending::<()>().await,
703                    }
704                }, if self.state() == NodeState::ShuttingDown => {
705                    break;
706                }
707            }
708        }
709
710        if residual_events > 0 {
711            log::debug!("Processed {residual_events} residual events during shutdown");
712        }
713
714        let _ = self.kernel.cache().borrow().check_residuals();
715
716        self.finalize_stop().await?;
717
718        // Handle events that arrived during finalize_stop
719        self.drain_channels(
720            &mut time_evt_rx,
721            &mut data_evt_rx,
722            &mut data_cmd_rx,
723            &mut exec_evt_rx,
724            &mut exec_cmd_rx,
725        );
726
727        log::info!("Event loop stopped");
728
729        Ok(())
730    }
731
732    /// Returns `true` if all engines connected successfully, `false` otherwise.
733    /// Note: Does NOT run reconciliation - that happens after pending events are drained.
734    async fn complete_startup(&mut self) -> anyhow::Result<bool> {
735        self.kernel.connect_clients().await;
736
737        if !self.await_engines_connected().await {
738            return Ok(false);
739        }
740
741        Ok(true)
742    }
743
744    fn initiate_shutdown(&mut self) {
745        self.kernel.stop_trader();
746        let delay = self.kernel.delay_post_stop();
747        log::info!("Awaiting residual events ({delay:?})...");
748
749        self.shutdown_deadline = Some(tokio::time::Instant::now() + delay);
750        self.handle.set_state(NodeState::ShuttingDown);
751    }
752
753    async fn finalize_stop(&mut self) -> anyhow::Result<()> {
754        self.kernel.disconnect_clients().await?;
755        self.await_engines_disconnected().await;
756        self.kernel.finalize_stop().await;
757
758        self.handle.set_state(NodeState::Stopped);
759
760        Ok(())
761    }
762
763    fn drain_channels(
764        &self,
765        time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
766        data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
767        data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
768        exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
769        exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
770    ) {
771        let mut drained = 0;
772
773        while let Ok(handler) = time_evt_rx.try_recv() {
774            AsyncRunner::handle_time_event(handler);
775            drained += 1;
776        }
777        while let Ok(cmd) = data_cmd_rx.try_recv() {
778            AsyncRunner::handle_data_command(cmd);
779            drained += 1;
780        }
781        while let Ok(evt) = data_evt_rx.try_recv() {
782            AsyncRunner::handle_data_event(evt);
783            drained += 1;
784        }
785        while let Ok(cmd) = exec_cmd_rx.try_recv() {
786            AsyncRunner::handle_exec_command(cmd);
787            drained += 1;
788        }
789        while let Ok(evt) = exec_evt_rx.try_recv() {
790            AsyncRunner::handle_exec_event(evt);
791            drained += 1;
792        }
793
794        if drained > 0 {
795            log::info!("Drained {drained} remaining events during shutdown");
796        }
797    }
798
799    /// Gets the node's environment.
800    #[must_use]
801    pub fn environment(&self) -> Environment {
802        self.kernel.environment()
803    }
804
805    /// Gets a reference to the underlying kernel.
806    #[must_use]
807    pub const fn kernel(&self) -> &NautilusKernel {
808        &self.kernel
809    }
810
811    /// Gets an exclusive reference to the underlying kernel.
812    #[must_use]
813    pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
814        &mut self.kernel
815    }
816
817    /// Gets the node's trader ID.
818    #[must_use]
819    pub fn trader_id(&self) -> TraderId {
820        self.kernel.trader_id()
821    }
822
823    /// Gets the node's instance ID.
824    #[must_use]
825    pub const fn instance_id(&self) -> UUID4 {
826        self.kernel.instance_id()
827    }
828
829    /// Returns the current node state.
830    #[must_use]
831    pub fn state(&self) -> NodeState {
832        self.handle.state()
833    }
834
835    /// Checks if the live node is currently running.
836    #[must_use]
837    pub fn is_running(&self) -> bool {
838        self.state().is_running()
839    }
840
841    /// Sets the cache database adapter for persistence.
842    ///
843    /// This allows setting a database adapter (e.g., PostgreSQL, Redis) after the node
844    /// is built but before it starts running. The database adapter is used to persist
845    /// cache data for recovery and state management.
846    ///
847    /// # Errors
848    ///
849    /// Returns an error if the node is already running.
850    pub fn set_cache_database(
851        &mut self,
852        database: Box<dyn CacheDatabaseAdapter>,
853    ) -> anyhow::Result<()> {
854        if self.state() != NodeState::Idle {
855            anyhow::bail!(
856                "Cannot set cache database while node is running, set it before calling start()"
857            );
858        }
859
860        self.kernel.cache().borrow_mut().set_database(database);
861        Ok(())
862    }
863
864    /// Gets a reference to the execution manager.
865    #[must_use]
866    pub const fn exec_manager(&self) -> &ExecutionManager {
867        &self.exec_manager
868    }
869
870    /// Gets an exclusive reference to the execution manager.
871    #[must_use]
872    pub fn exec_manager_mut(&mut self) -> &mut ExecutionManager {
873        &mut self.exec_manager
874    }
875
876    /// Adds an actor to the trader.
877    ///
878    /// This method provides a high-level interface for adding actors to the underlying
879    /// trader without requiring direct access to the kernel. Actors should be added
880    /// after the node is built but before starting the node.
881    ///
882    /// # Errors
883    ///
884    /// Returns an error if:
885    /// - The trader is not in a valid state for adding components.
886    /// - An actor with the same ID is already registered.
887    /// - The node is currently running.
888    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
889    where
890        T: DataActor + Component + Actor + 'static,
891    {
892        if self.state() != NodeState::Idle {
893            anyhow::bail!(
894                "Cannot add actor while node is running, add actors before calling start()"
895            );
896        }
897
898        self.kernel.trader.add_actor(actor)
899    }
900
901    /// Adds an actor to the live node using a factory function.
902    ///
903    /// The factory function is called at registration time to create the actor,
904    /// avoiding cloning issues with non-cloneable actor types.
905    ///
906    /// # Errors
907    ///
908    /// Returns an error if:
909    /// - The node is currently running.
910    /// - The factory function fails to create the actor.
911    /// - The underlying trader registration fails.
912    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
913    where
914        F: FnOnce() -> anyhow::Result<T>,
915        T: DataActor + Component + Actor + 'static,
916    {
917        if self.state() != NodeState::Idle {
918            anyhow::bail!(
919                "Cannot add actor while node is running, add actors before calling start()"
920            );
921        }
922
923        self.kernel.trader.add_actor_from_factory(factory)
924    }
925
926    /// Adds a strategy to the trader.
927    ///
928    /// Strategies are registered in both the component registry (for lifecycle management)
929    /// and the actor registry (for data callbacks via msgbus).
930    ///
931    /// # Errors
932    ///
933    /// Returns an error if:
934    /// - The node is currently running.
935    /// - A strategy with the same ID is already registered.
936    pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
937    where
938        T: Strategy + Component + Debug + 'static,
939    {
940        if self.state() != NodeState::Idle {
941            anyhow::bail!(
942                "Cannot add strategy while node is running, add strategies before calling start()"
943            );
944        }
945
946        // Register external order claims before adding strategy (which moves it)
947        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
948        if let Some(claims) = strategy.external_order_claims() {
949            for instrument_id in claims {
950                self.exec_manager
951                    .claim_external_orders(instrument_id, strategy_id);
952            }
953            log_info!(
954                "Registered external order claims for {}: {:?}",
955                strategy_id,
956                strategy.external_order_claims(),
957                color = LogColor::Blue
958            );
959        }
960
961        self.kernel.trader.add_strategy(strategy)
962    }
963}
964
965/// Events queued during startup to avoid RefCell borrow conflicts.
966///
967/// During `connect_clients()`, the data_engine and exec_engine are borrowed
968/// across awaits. Processing commands/events that trigger msgbus handlers
969/// would try to borrow the same engines, causing a panic.
970#[derive(Default)]
971struct PendingEvents {
972    data_cmds: Vec<DataCommand>,
973    data_evts: Vec<DataEvent>,
974    exec_cmds: Vec<TradingCommand>,
975    order_evts: Vec<OrderEventAny>,
976}
977
978impl PendingEvents {
979    fn drain(&mut self) {
980        let total = self.data_evts.len()
981            + self.data_cmds.len()
982            + self.exec_cmds.len()
983            + self.order_evts.len();
984
985        if total > 0 {
986            log::debug!(
987                "Processing {total} events/commands queued during startup \
988                 (data_evts={}, data_cmds={}, exec_cmds={}, order_evts={})",
989                self.data_evts.len(),
990                self.data_cmds.len(),
991                self.exec_cmds.len(),
992                self.order_evts.len()
993            );
994        }
995
996        for evt in self.data_evts.drain(..) {
997            AsyncRunner::handle_data_event(evt);
998        }
999        for cmd in self.data_cmds.drain(..) {
1000            AsyncRunner::handle_data_command(cmd);
1001        }
1002        for cmd in self.exec_cmds.drain(..) {
1003            AsyncRunner::handle_exec_command(cmd);
1004        }
1005        for evt in self.order_evts.drain(..) {
1006            AsyncRunner::handle_exec_event(ExecutionEvent::Order(evt));
1007        }
1008    }
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013    use nautilus_model::identifiers::TraderId;
1014    use rstest::*;
1015
1016    use super::*;
1017
1018    #[rstest]
1019    #[case(0, NodeState::Idle)]
1020    #[case(1, NodeState::Starting)]
1021    #[case(2, NodeState::Running)]
1022    #[case(3, NodeState::ShuttingDown)]
1023    #[case(4, NodeState::Stopped)]
1024    fn test_node_state_from_u8_valid(#[case] value: u8, #[case] expected: NodeState) {
1025        assert_eq!(NodeState::from_u8(value), expected);
1026    }
1027
1028    #[rstest]
1029    #[case(5)]
1030    #[case(255)]
1031    #[should_panic(expected = "Invalid NodeState value")]
1032    fn test_node_state_from_u8_invalid_panics(#[case] value: u8) {
1033        let _ = NodeState::from_u8(value);
1034    }
1035
1036    #[rstest]
1037    fn test_node_state_roundtrip() {
1038        for state in [
1039            NodeState::Idle,
1040            NodeState::Starting,
1041            NodeState::Running,
1042            NodeState::ShuttingDown,
1043            NodeState::Stopped,
1044        ] {
1045            assert_eq!(NodeState::from_u8(state.as_u8()), state);
1046        }
1047    }
1048
1049    #[rstest]
1050    fn test_node_state_is_running_only_for_running() {
1051        assert!(!NodeState::Idle.is_running());
1052        assert!(!NodeState::Starting.is_running());
1053        assert!(NodeState::Running.is_running());
1054        assert!(!NodeState::ShuttingDown.is_running());
1055        assert!(!NodeState::Stopped.is_running());
1056    }
1057
1058    #[rstest]
1059    fn test_handle_initial_state() {
1060        let handle = LiveNodeHandle::new();
1061
1062        assert_eq!(handle.state(), NodeState::Idle);
1063        assert!(!handle.should_stop());
1064        assert!(!handle.is_running());
1065    }
1066
1067    #[rstest]
1068    fn test_handle_stop_sets_flag() {
1069        let handle = LiveNodeHandle::new();
1070
1071        handle.stop();
1072
1073        assert!(handle.should_stop());
1074    }
1075
1076    #[rstest]
1077    fn test_handle_set_state_running_clears_stop_flag() {
1078        let handle = LiveNodeHandle::new();
1079        handle.stop();
1080        assert!(handle.should_stop());
1081
1082        handle.set_state(NodeState::Running);
1083
1084        assert!(!handle.should_stop());
1085        assert!(handle.is_running());
1086        assert_eq!(handle.state(), NodeState::Running);
1087    }
1088
1089    #[rstest]
1090    fn test_handle_node_state_transitions() {
1091        let handle = LiveNodeHandle::new();
1092        assert_eq!(handle.state(), NodeState::Idle);
1093
1094        handle.set_state(NodeState::Starting);
1095        assert_eq!(handle.state(), NodeState::Starting);
1096        assert!(!handle.is_running());
1097
1098        handle.set_state(NodeState::Running);
1099        assert_eq!(handle.state(), NodeState::Running);
1100        assert!(handle.is_running());
1101
1102        handle.set_state(NodeState::ShuttingDown);
1103        assert_eq!(handle.state(), NodeState::ShuttingDown);
1104        assert!(!handle.is_running());
1105
1106        handle.set_state(NodeState::Stopped);
1107        assert_eq!(handle.state(), NodeState::Stopped);
1108        assert!(!handle.is_running());
1109    }
1110
1111    #[rstest]
1112    fn test_handle_clone_shares_state_bidirectionally() {
1113        let handle1 = LiveNodeHandle::new();
1114        let handle2 = handle1.clone();
1115
1116        // Mutation from handle1 visible in handle2
1117        handle1.stop();
1118        assert!(handle2.should_stop());
1119
1120        // Mutation from handle2 visible in handle1
1121        handle2.set_state(NodeState::Running);
1122        assert_eq!(handle1.state(), NodeState::Running);
1123    }
1124
1125    #[rstest]
1126    fn test_handle_stop_flag_independent_of_state() {
1127        let handle = LiveNodeHandle::new();
1128
1129        // Stop flag can be set regardless of state
1130        handle.set_state(NodeState::Starting);
1131        handle.stop();
1132        assert!(handle.should_stop());
1133        assert_eq!(handle.state(), NodeState::Starting);
1134
1135        // Only Running state clears the stop flag
1136        handle.set_state(NodeState::ShuttingDown);
1137        assert!(handle.should_stop()); // Still set
1138
1139        handle.set_state(NodeState::Running);
1140        assert!(!handle.should_stop()); // Cleared
1141    }
1142
1143    #[rstest]
1144    fn test_builder_creation() {
1145        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1146
1147        assert!(result.is_ok());
1148    }
1149
1150    #[rstest]
1151    fn test_builder_rejects_backtest() {
1152        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
1153
1154        assert!(result.is_err());
1155        assert!(result.unwrap_err().to_string().contains("Backtest"));
1156    }
1157
1158    #[rstest]
1159    fn test_builder_accepts_live_environment() {
1160        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
1161
1162        assert!(result.is_ok());
1163    }
1164
1165    #[rstest]
1166    fn test_builder_accepts_sandbox_environment() {
1167        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1168
1169        assert!(result.is_ok());
1170    }
1171
1172    #[rstest]
1173    fn test_builder_fluent_api_chaining() {
1174        let builder = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live)
1175            .unwrap()
1176            .with_name("TestNode")
1177            .with_instance_id(UUID4::new())
1178            .with_load_state(false)
1179            .with_save_state(true)
1180            .with_timeout_connection(30)
1181            .with_timeout_reconciliation(60)
1182            .with_reconciliation(true)
1183            .with_reconciliation_lookback_mins(120)
1184            .with_timeout_portfolio(10)
1185            .with_timeout_disconnection_secs(5)
1186            .with_delay_post_stop_secs(3)
1187            .with_delay_shutdown_secs(10);
1188
1189        assert_eq!(builder.name(), "TestNode");
1190    }
1191
1192    #[cfg(feature = "python")]
1193    #[rstest]
1194    fn test_node_build_and_initial_state() {
1195        let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1196            .unwrap()
1197            .with_name("TestNode")
1198            .build()
1199            .unwrap();
1200
1201        assert_eq!(node.state(), NodeState::Idle);
1202        assert!(!node.is_running());
1203        assert_eq!(node.environment(), Environment::Sandbox);
1204        assert_eq!(node.trader_id(), TraderId::from("TRADER-001"));
1205    }
1206
1207    #[cfg(feature = "python")]
1208    #[rstest]
1209    fn test_node_handle_reflects_node_state() {
1210        let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1211            .unwrap()
1212            .with_name("TestNode")
1213            .build()
1214            .unwrap();
1215
1216        let handle = node.handle();
1217
1218        assert_eq!(handle.state(), NodeState::Idle);
1219        assert!(!handle.is_running());
1220    }
1221}