nautilus_live/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fmt::Debug,
19    sync::{
20        Arc,
21        atomic::{AtomicBool, Ordering},
22    },
23    time::{Duration, Instant},
24};
25
26use nautilus_common::{
27    actor::{Actor, DataActor},
28    component::Component,
29    enums::Environment,
30    messages::{DataEvent, ExecutionEvent, data::DataCommand, execution::TradingCommand},
31    timer::TimeEventHandlerV2,
32};
33use nautilus_core::UUID4;
34use nautilus_data::client::DataClientAdapter;
35use nautilus_model::identifiers::TraderId;
36use nautilus_system::{
37    config::NautilusKernelConfig,
38    factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
39    kernel::NautilusKernel,
40};
41use nautilus_trading::strategy::Strategy;
42
43use crate::{
44    config::LiveNodeConfig,
45    runner::{AsyncRunner, AsyncRunnerChannels},
46};
47
48/// A thread-safe handle to control a `LiveNode` from other threads.
49/// This allows starting, stopping, and querying the node's state
50/// without requiring the node itself to be Send + Sync.
51#[derive(Clone, Debug)]
52pub struct LiveNodeHandle {
53    /// Atomic flag indicating if the node should stop.
54    pub(crate) stop_flag: Arc<AtomicBool>,
55    /// Atomic flag indicating if the node is currently running.
56    pub(crate) running_flag: Arc<AtomicBool>,
57}
58
59impl Default for LiveNodeHandle {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl LiveNodeHandle {
66    /// Creates a new handle with default (stopped) state.
67    #[must_use]
68    pub fn new() -> Self {
69        Self {
70            stop_flag: Arc::new(AtomicBool::new(false)),
71            running_flag: Arc::new(AtomicBool::new(false)),
72        }
73    }
74
75    /// Returns whether the node should stop.
76    #[must_use]
77    pub fn should_stop(&self) -> bool {
78        self.stop_flag.load(Ordering::Relaxed)
79    }
80
81    /// Returns whether the node is currently running.
82    #[must_use]
83    pub fn is_running(&self) -> bool {
84        self.running_flag.load(Ordering::Relaxed)
85    }
86
87    /// Signals the node to stop.
88    pub fn stop(&self) {
89        self.stop_flag.store(true, Ordering::Relaxed);
90    }
91
92    /// Marks the node as running (internal use).
93    pub(crate) fn set_running(&self, running: bool) {
94        self.running_flag.store(running, Ordering::Relaxed);
95        if running {
96            // Clear stop flag when starting
97            self.stop_flag.store(false, Ordering::Relaxed);
98        }
99    }
100}
101
102/// High-level abstraction for a live Nautilus system node.
103///
104/// Provides a simplified interface for running live systems
105/// with automatic client management and lifecycle handling.
106#[derive(Debug)]
107#[cfg_attr(
108    feature = "python",
109    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
110)]
111pub struct LiveNode {
112    kernel: NautilusKernel,
113    runner: Option<AsyncRunner>,
114    config: LiveNodeConfig,
115    is_running: bool,
116    handle: LiveNodeHandle,
117    #[cfg(feature = "python")]
118    #[allow(dead_code)] // TODO: Under development
119    python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
120}
121
122impl LiveNode {
123    /// Returns a thread-safe handle to control this node.
124    #[must_use]
125    pub fn handle(&self) -> LiveNodeHandle {
126        self.handle.clone()
127    }
128
129    /// Creates a new [`LiveNodeBuilder`] for fluent configuration.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the environment is invalid for live trading.
134    pub fn builder(
135        trader_id: TraderId,
136        environment: Environment,
137    ) -> anyhow::Result<LiveNodeBuilder> {
138        LiveNodeBuilder::new(trader_id, environment)
139    }
140
141    /// Creates a new [`LiveNode`] directly from a kernel name and optional configuration.
142    ///
143    /// This is a convenience method for creating a live node with a pre-configured
144    /// kernel configuration, bypassing the builder pattern. If no config is provided,
145    /// a default configuration will be used.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if kernel construction fails.
150    pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
151        let mut config = config.unwrap_or_default();
152        config.environment = Environment::Live;
153
154        match config.environment() {
155            Environment::Sandbox | Environment::Live => {}
156            Environment::Backtest => {
157                anyhow::bail!("LiveNode cannot be used with Backtest environment");
158            }
159        }
160
161        let runner = AsyncRunner::new();
162        let kernel = NautilusKernel::new(name, config.clone())?;
163
164        log::info!("LiveNode built successfully with kernel config");
165
166        Ok(Self {
167            kernel,
168            runner: Some(runner),
169            config,
170            is_running: false,
171            handle: LiveNodeHandle::new(),
172            #[cfg(feature = "python")]
173            python_actors: Vec::new(),
174        })
175    }
176
177    /// Starts the live node.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if startup fails.
182    pub async fn start(&mut self) -> anyhow::Result<()> {
183        if self.is_running {
184            anyhow::bail!("Already running");
185        }
186
187        self.kernel.start_async().await;
188        self.kernel.connect_clients().await?;
189        self.await_engines_connected().await?;
190
191        self.is_running = true;
192        self.handle.set_running(true);
193
194        Ok(())
195    }
196
197    /// Stop the live node.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if shutdown fails.
202    pub async fn stop(&mut self) -> anyhow::Result<()> {
203        if !self.is_running {
204            anyhow::bail!("Not running");
205        }
206
207        self.kernel.stop_async().await;
208        self.kernel.disconnect_clients().await?;
209        self.await_engines_disconnected().await?;
210
211        self.is_running = false;
212        self.handle.set_running(false);
213
214        Ok(())
215    }
216
217    /// Awaits engine clients to connect with timeout.
218    async fn await_engines_connected(&self) -> anyhow::Result<()> {
219        let start = Instant::now();
220        let timeout = self.config.timeout_connection;
221        let interval = Duration::from_millis(100);
222
223        while start.elapsed() < timeout {
224            if self.kernel.check_engines_connected() {
225                log::info!("All engine clients connected");
226                return Ok(());
227            }
228            tokio::time::sleep(interval).await;
229        }
230
231        anyhow::bail!("Timeout waiting for engine clients to connect after {timeout:?}")
232    }
233
234    /// Awaits engine clients to disconnect with timeout.
235    async fn await_engines_disconnected(&self) -> anyhow::Result<()> {
236        let start = Instant::now();
237        let timeout = self.config.timeout_disconnection;
238        let interval = Duration::from_millis(100);
239
240        while start.elapsed() < timeout {
241            if self.kernel.check_engines_disconnected() {
242                log::info!("All engine clients disconnected");
243                return Ok(());
244            }
245            tokio::time::sleep(interval).await;
246        }
247
248        anyhow::bail!("Timeout waiting for engine clients to disconnect after {timeout:?}")
249    }
250
251    /// Run the live node with automatic shutdown handling.
252    ///
253    /// This method starts the node, runs indefinitely, and handles graceful shutdown
254    /// on interrupt signals.
255    ///
256    /// # Thread Safety
257    ///
258    /// The event loop runs directly on the current thread (not spawned) because the
259    /// msgbus uses thread-local storage. Endpoints registered by the kernel are only
260    /// accessible from the same thread.
261    ///
262    /// # Shutdown Sequence
263    ///
264    /// 1. Signal received (SIGINT or handle stop).
265    /// 2. Kernel shutdown begins.
266    /// 3. Remaining events are drained from channels.
267    ///
268    /// # Errors
269    ///
270    /// Returns an error if the node fails to start or encounters a runtime error.
271    pub async fn run(&mut self) -> anyhow::Result<()> {
272        if self.runner.is_none() {
273            anyhow::bail!("Runner already consumed - run() called twice");
274        }
275
276        self.start().await?;
277
278        // SAFETY: We checked is_none() above and start() doesn't consume the runner
279        let Some(runner) = self.runner.take() else {
280            unreachable!("Runner was verified to exist before start()")
281        };
282
283        let AsyncRunnerChannels {
284            mut time_evt_rx,
285            mut data_evt_rx,
286            mut data_cmd_rx,
287            mut exec_evt_rx,
288            mut exec_cmd_rx,
289        } = runner.take_channels();
290
291        log::info!("Event loop starting");
292
293        loop {
294            tokio::select! {
295                result = tokio::signal::ctrl_c() => {
296                    match result {
297                        Ok(()) => log::info!("Received SIGINT, shutting down"),
298                        Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
299                    }
300                    break;
301                }
302                Some(handler) = time_evt_rx.recv() => {
303                    AsyncRunner::handle_time_event(handler);
304                }
305                Some(cmd) = data_cmd_rx.recv() => {
306                    AsyncRunner::handle_data_command(cmd);
307                }
308                Some(evt) = data_evt_rx.recv() => {
309                    AsyncRunner::handle_data_event(evt);
310                }
311                Some(cmd) = exec_cmd_rx.recv() => {
312                    AsyncRunner::handle_exec_command(cmd);
313                }
314                Some(evt) = exec_evt_rx.recv() => {
315                    AsyncRunner::handle_exec_event(evt);
316                }
317                () = async {
318                    loop {
319                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
320                        if self.handle.should_stop() {
321                            log::info!("Received stop signal from handle");
322                            return;
323                        }
324                    }
325                } => {
326                    break;
327                }
328            }
329        }
330
331        self.stop().await?;
332
333        self.drain_channels(
334            &mut time_evt_rx,
335            &mut data_evt_rx,
336            &mut data_cmd_rx,
337            &mut exec_evt_rx,
338            &mut exec_cmd_rx,
339        );
340
341        log::info!("Event loop stopped");
342
343        Ok(())
344    }
345
346    fn drain_channels(
347        &self,
348        time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
349        data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
350        data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
351        exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
352        exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
353    ) {
354        let mut drained = 0;
355
356        while let Ok(handler) = time_evt_rx.try_recv() {
357            AsyncRunner::handle_time_event(handler);
358            drained += 1;
359        }
360        while let Ok(cmd) = data_cmd_rx.try_recv() {
361            AsyncRunner::handle_data_command(cmd);
362            drained += 1;
363        }
364        while let Ok(evt) = data_evt_rx.try_recv() {
365            AsyncRunner::handle_data_event(evt);
366            drained += 1;
367        }
368        while let Ok(cmd) = exec_cmd_rx.try_recv() {
369            AsyncRunner::handle_exec_command(cmd);
370            drained += 1;
371        }
372        while let Ok(evt) = exec_evt_rx.try_recv() {
373            AsyncRunner::handle_exec_event(evt);
374            drained += 1;
375        }
376
377        if drained > 0 {
378            log::info!("Drained {drained} remaining events during shutdown");
379        }
380    }
381
382    /// Gets the node's environment.
383    #[must_use]
384    pub fn environment(&self) -> Environment {
385        self.kernel.environment()
386    }
387
388    /// Gets a reference to the underlying kernel.
389    #[must_use]
390    pub const fn kernel(&self) -> &NautilusKernel {
391        &self.kernel
392    }
393
394    /// Gets an exclusive reference to the underlying kernel.
395    #[must_use]
396    pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
397        &mut self.kernel
398    }
399
400    /// Gets the node's trader ID.
401    #[must_use]
402    pub fn trader_id(&self) -> TraderId {
403        self.kernel.trader_id()
404    }
405
406    /// Gets the node's instance ID.
407    #[must_use]
408    pub const fn instance_id(&self) -> UUID4 {
409        self.kernel.instance_id()
410    }
411
412    /// Checks if the live node is currently running.
413    #[must_use]
414    pub const fn is_running(&self) -> bool {
415        self.is_running
416    }
417
418    /// Adds an actor to the trader.
419    ///
420    /// This method provides a high-level interface for adding actors to the underlying
421    /// trader without requiring direct access to the kernel. Actors should be added
422    /// after the node is built but before starting the node.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error if:
427    /// - The trader is not in a valid state for adding components.
428    /// - An actor with the same ID is already registered.
429    /// - The node is currently running.
430    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
431    where
432        T: DataActor + Component + Actor + 'static,
433    {
434        if self.is_running {
435            anyhow::bail!(
436                "Cannot add actor while node is running. Add actors before calling start()."
437            );
438        }
439
440        self.kernel.trader.add_actor(actor)
441    }
442
443    /// Adds an actor to the live node using a factory function.
444    ///
445    /// The factory function is called at registration time to create the actor,
446    /// avoiding cloning issues with non-cloneable actor types.
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if:
451    /// - The node is currently running.
452    /// - The factory function fails to create the actor.
453    /// - The underlying trader registration fails.
454    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
455    where
456        F: FnOnce() -> anyhow::Result<T>,
457        T: DataActor + Component + Actor + 'static,
458    {
459        if self.is_running {
460            anyhow::bail!(
461                "Cannot add actor while node is running, add actors before calling start()"
462            );
463        }
464
465        self.kernel.trader.add_actor_from_factory(factory)
466    }
467
468    /// Adds a strategy to the trader.
469    ///
470    /// Strategies are registered in both the component registry (for lifecycle management)
471    /// and the actor registry (for data callbacks via msgbus).
472    ///
473    /// # Errors
474    ///
475    /// Returns an error if:
476    /// - The node is currently running.
477    /// - A strategy with the same ID is already registered.
478    pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
479    where
480        T: Strategy + Component + Debug + 'static,
481    {
482        if self.is_running {
483            anyhow::bail!(
484                "Cannot add strategy while node is running, add strategies before calling start()"
485            );
486        }
487
488        self.kernel.trader.add_strategy(strategy)
489    }
490}
491
492/// Builder for constructing a [`LiveNode`] with a fluent API.
493///
494/// Provides configuration options specific to live nodes,
495/// including client factory registration and timeout settings.
496#[derive(Debug)]
497#[cfg_attr(
498    feature = "python",
499    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
500)]
501pub struct LiveNodeBuilder {
502    name: String,
503    config: LiveNodeConfig,
504    data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
505    exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
506    data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
507    exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
508}
509
510impl LiveNodeBuilder {
511    /// Creates a new [`LiveNodeBuilder`] with required parameters.
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if `environment` is invalid (BACKTEST).
516    pub fn new(trader_id: TraderId, environment: Environment) -> anyhow::Result<Self> {
517        match environment {
518            Environment::Sandbox | Environment::Live => {}
519            Environment::Backtest => {
520                anyhow::bail!("LiveNode cannot be used with Backtest environment");
521            }
522        }
523
524        let config = LiveNodeConfig {
525            environment,
526            trader_id,
527            ..Default::default()
528        };
529
530        Ok(Self {
531            name: "LiveNode".to_string(),
532            config,
533            data_client_factories: HashMap::new(),
534            exec_client_factories: HashMap::new(),
535            data_client_configs: HashMap::new(),
536            exec_client_configs: HashMap::new(),
537        })
538    }
539
540    /// Returns the name for the node.
541    #[must_use]
542    pub fn name(&self) -> &str {
543        &self.name
544    }
545
546    /// Set the name for the node.
547    #[must_use]
548    pub fn with_name(mut self, name: impl Into<String>) -> Self {
549        self.name = name.into();
550        self
551    }
552
553    /// Set the instance ID for the node.
554    #[must_use]
555    pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
556        self.config.instance_id = Some(instance_id);
557        self
558    }
559
560    /// Configure whether to load state on startup.
561    #[must_use]
562    pub const fn with_load_state(mut self, load_state: bool) -> Self {
563        self.config.load_state = load_state;
564        self
565    }
566
567    /// Configure whether to save state on shutdown.
568    #[must_use]
569    pub const fn with_save_state(mut self, save_state: bool) -> Self {
570        self.config.save_state = save_state;
571        self
572    }
573
574    /// Set the connection timeout in seconds.
575    #[must_use]
576    pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
577        self.config.timeout_connection = Duration::from_secs(timeout_secs);
578        self
579    }
580
581    /// Set the reconciliation timeout in seconds.
582    #[must_use]
583    pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
584        self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
585        self
586    }
587
588    /// Set the portfolio initialization timeout in seconds.
589    #[must_use]
590    pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
591        self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
592        self
593    }
594
595    /// Set the disconnection timeout in seconds.
596    #[must_use]
597    pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
598        self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
599        self
600    }
601
602    /// Set the post-stop delay in seconds.
603    #[must_use]
604    pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
605        self.config.delay_post_stop = Duration::from_secs(delay_secs);
606        self
607    }
608
609    /// Set the shutdown timeout in seconds.
610    #[must_use]
611    pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
612        self.config.timeout_shutdown = Duration::from_secs(delay_secs);
613        self
614    }
615
616    /// Adds a data client factory with configuration.
617    ///
618    /// # Errors
619    ///
620    /// Returns an error if a client with the same name is already registered.
621    pub fn add_data_client(
622        mut self,
623        name: Option<String>,
624        factory: Box<dyn DataClientFactory>,
625        config: Box<dyn ClientConfig>,
626    ) -> anyhow::Result<Self> {
627        let name = name.unwrap_or_else(|| factory.name().to_string());
628
629        if self.data_client_factories.contains_key(&name) {
630            anyhow::bail!("Data client '{name}' is already registered");
631        }
632
633        self.data_client_factories.insert(name.clone(), factory);
634        self.data_client_configs.insert(name, config);
635        Ok(self)
636    }
637
638    /// Adds an execution client factory with configuration.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if a client with the same name is already registered.
643    pub fn add_exec_client(
644        mut self,
645        name: Option<String>,
646        factory: Box<dyn ExecutionClientFactory>,
647        config: Box<dyn ClientConfig>,
648    ) -> anyhow::Result<Self> {
649        let name = name.unwrap_or_else(|| factory.name().to_string());
650
651        if self.exec_client_factories.contains_key(&name) {
652            anyhow::bail!("Execution client '{name}' is already registered");
653        }
654
655        self.exec_client_factories.insert(name.clone(), factory);
656        self.exec_client_configs.insert(name, config);
657        Ok(self)
658    }
659
660    /// Build the [`LiveNode`] with the configured settings.
661    ///
662    /// This will:
663    /// 1. Build the underlying kernel.
664    /// 2. Create clients using factories.
665    /// 3. Register clients with engines.
666    ///
667    /// # Errors
668    ///
669    /// Returns an error if node construction fails.
670    pub fn build(mut self) -> anyhow::Result<LiveNode> {
671        log::info!(
672            "Building LiveNode with {} data clients and {} execution clients",
673            self.data_client_factories.len(),
674            self.exec_client_factories.len()
675        );
676
677        // Create runner first to set up global event channels
678        let runner = AsyncRunner::new();
679        let kernel = NautilusKernel::new(self.name.clone(), self.config.clone())?;
680
681        // Create and register data clients
682        for (name, factory) in self.data_client_factories {
683            if let Some(config) = self.data_client_configs.remove(&name) {
684                log::info!("Creating data client '{name}'");
685
686                let client =
687                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
688                let client_id = client.client_id();
689                let venue = client.venue();
690
691                let adapter = DataClientAdapter::new(
692                    client_id, venue, true, // handles_order_book_deltas
693                    true, // handles_order_book_snapshots
694                    client,
695                );
696
697                kernel
698                    .data_engine
699                    .borrow_mut()
700                    .register_client(adapter, venue);
701
702                log::info!("Registered data client '{name}' ({client_id})");
703            } else {
704                log::warn!("No config found for data client factory '{name}'");
705            }
706        }
707
708        // Create and register execution clients
709        for (name, factory) in self.exec_client_factories {
710            if let Some(config) = self.exec_client_configs.remove(&name) {
711                log::info!("Creating execution client '{name}'");
712
713                let client =
714                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
715                let client_id = client.client_id();
716
717                kernel.exec_engine.borrow_mut().register_client(client)?;
718
719                log::info!("Registered execution client '{name}' ({client_id})");
720            } else {
721                log::warn!("No config found for execution client factory '{name}'");
722            }
723        }
724
725        log::info!("Built successfully");
726
727        Ok(LiveNode {
728            kernel,
729            runner: Some(runner),
730            config: self.config,
731            is_running: false,
732            handle: LiveNodeHandle::new(),
733            #[cfg(feature = "python")]
734            python_actors: Vec::new(),
735        })
736    }
737}
738
739#[cfg(test)]
740mod tests {
741    use nautilus_model::identifiers::TraderId;
742    use rstest::*;
743
744    use super::*;
745
746    #[rstest]
747    fn test_trading_node_builder_creation() {
748        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
749
750        assert!(result.is_ok());
751    }
752
753    #[rstest]
754    fn test_trading_node_builder_rejects_backtest() {
755        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
756
757        assert!(result.is_err());
758        assert!(
759            result
760                .unwrap_err()
761                .to_string()
762                .contains("Backtest environment")
763        );
764    }
765
766    #[rstest]
767    fn test_trading_node_builder_fluent_api() {
768        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
769
770        assert!(result.is_ok());
771        let _builder = result
772            .unwrap()
773            .with_name("TestNode")
774            .with_timeout_connection(30)
775            .with_load_state(false);
776
777        // Should not panic and methods should chain
778    }
779
780    #[cfg(feature = "python")]
781    #[rstest]
782    fn test_trading_node_build() {
783        let builder_result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
784
785        assert!(builder_result.is_ok());
786        let build_result = builder_result.unwrap().with_name("TestNode").build();
787
788        assert!(build_result.is_ok());
789        let node = build_result.unwrap();
790        assert!(!node.is_running());
791        assert_eq!(node.environment(), Environment::Sandbox);
792    }
793
794    #[rstest]
795    fn test_builder_rejects_backtest_environment() {
796        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
797
798        assert!(result.is_err());
799        assert!(
800            result
801                .unwrap_err()
802                .to_string()
803                .contains("Backtest environment")
804        );
805    }
806}