nautilus_live/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    cell::RefCell,
22    collections::HashMap,
23    rc::Rc,
24    sync::{
25        Arc,
26        atomic::{AtomicBool, Ordering},
27    },
28    time::Duration,
29};
30
31use nautilus_common::{
32    actor::{Actor, DataActor},
33    clock::LiveClock,
34    component::Component,
35    enums::Environment,
36};
37use nautilus_core::UUID4;
38use nautilus_data::client::DataClientAdapter;
39use nautilus_model::identifiers::TraderId;
40use nautilus_system::{
41    config::NautilusKernelConfig,
42    factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
43    kernel::NautilusKernel,
44};
45
46use crate::{config::LiveNodeConfig, runner::AsyncRunner};
47
48/// A thread-safe handle to control a `LiveNode` from other threads.
49/// This allows starting, stopping, and querying the node's state
50/// without requiring the node itself to be Send + Sync.
51#[derive(Clone, Debug)]
52pub struct LiveNodeHandle {
53    /// Atomic flag indicating if the node should stop.
54    pub(crate) stop_flag: Arc<AtomicBool>,
55    /// Atomic flag indicating if the node is currently running.
56    pub(crate) running_flag: Arc<AtomicBool>,
57}
58
59impl Default for LiveNodeHandle {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl LiveNodeHandle {
66    /// Creates a new handle with default (stopped) state.
67    #[must_use]
68    pub fn new() -> Self {
69        Self {
70            stop_flag: Arc::new(AtomicBool::new(false)),
71            running_flag: Arc::new(AtomicBool::new(false)),
72        }
73    }
74
75    /// Returns whether the node should stop.
76    #[must_use]
77    pub fn should_stop(&self) -> bool {
78        self.stop_flag.load(Ordering::Relaxed)
79    }
80
81    /// Returns whether the node is currently running.
82    #[must_use]
83    pub fn is_running(&self) -> bool {
84        self.running_flag.load(Ordering::Relaxed)
85    }
86
87    /// Signals the node to stop.
88    pub fn stop(&self) {
89        self.stop_flag.store(true, Ordering::Relaxed);
90    }
91
92    /// Marks the node as running (internal use).
93    pub(crate) fn set_running(&self, running: bool) {
94        self.running_flag.store(running, Ordering::Relaxed);
95        if running {
96            // Clear stop flag when starting
97            self.stop_flag.store(false, Ordering::Relaxed);
98        }
99    }
100}
101
102/// High-level abstraction for a live Nautilus system node.
103///
104/// Provides a simplified interface for running live systems
105/// with automatic client management and lifecycle handling.
106#[derive(Debug)]
107#[cfg_attr(
108    feature = "python",
109    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
110)]
111pub struct LiveNode {
112    clock: Rc<RefCell<LiveClock>>,
113    kernel: NautilusKernel,
114    runner: AsyncRunner,
115    config: LiveNodeConfig,
116    is_running: bool,
117    /// Handle for thread-safe control of this node.
118    handle: LiveNodeHandle,
119}
120
121impl LiveNode {
122    /// Returns a thread-safe handle to control this node.
123    #[must_use]
124    pub fn handle(&self) -> LiveNodeHandle {
125        self.handle.clone()
126    }
127    /// Creates a new [`LiveNodeBuilder`] for fluent configuration.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the environment is invalid for live trading.
132    pub fn builder(
133        name: String,
134        trader_id: TraderId,
135        environment: Environment,
136    ) -> anyhow::Result<LiveNodeBuilder> {
137        LiveNodeBuilder::new(name, trader_id, environment)
138    }
139
140    /// Creates a new [`LiveNode`] directly from a kernel name and optional configuration.
141    ///
142    /// This is a convenience method for creating a live node with a pre-configured
143    /// kernel configuration, bypassing the builder pattern. If no config is provided,
144    /// a default configuration will be used.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if kernel construction fails.
149    pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
150        let mut config = config.unwrap_or_default();
151        config.environment = Environment::Live;
152
153        // Validate environment for live trading
154        match config.environment() {
155            Environment::Sandbox | Environment::Live => {}
156            Environment::Backtest => {
157                anyhow::bail!("LiveNode cannot be used with Backtest environment");
158            }
159        }
160
161        let runner = AsyncRunner::new();
162        let clock = Rc::new(RefCell::new(LiveClock::default()));
163        let kernel = NautilusKernel::new(name, config.clone())?;
164
165        log::info!("LiveNode built successfully with kernel config");
166
167        Ok(Self {
168            clock,
169            kernel,
170            runner,
171            config,
172            is_running: false,
173            handle: LiveNodeHandle::new(),
174        })
175    }
176
177    /// Starts the live node.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if startup fails.
182    pub async fn start(&mut self) -> anyhow::Result<()> {
183        if self.is_running {
184            anyhow::bail!("Already running");
185        }
186
187        self.kernel.start_async().await;
188        self.is_running = true;
189        self.handle.set_running(true);
190
191        Ok(())
192    }
193
194    /// Stop the live node.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if shutdown fails.
199    pub async fn stop(&mut self) -> anyhow::Result<()> {
200        if !self.is_running {
201            anyhow::bail!("Not running");
202        }
203
204        self.kernel.stop_async().await;
205        self.is_running = false;
206        self.handle.set_running(false);
207
208        Ok(())
209    }
210
211    /// Run the live node with automatic shutdown handling.
212    ///
213    /// This method will start the node, run indefinitely, and handle
214    /// graceful shutdown on interrupt signals.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the node fails to start or encounters a runtime error.
219    pub async fn run(&mut self) -> anyhow::Result<()> {
220        self.start().await?;
221
222        tokio::select! {
223            // Run on main thread
224            () = self.runner.run() => {
225                log::info!("AsyncRunner finished");
226            }
227            // Handle stop signal from handle (for Python integration)
228            () = async {
229                while !self.handle.should_stop() {
230                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
231                }
232                log::info!("Received stop signal from handle");
233            } => {
234                self.runner.stop();
235                // Give the AsyncRunner a moment to process the shutdown signal
236                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
237            }
238            // Handle SIGINT signal (fallback for direct Rust usage)
239            result = tokio::signal::ctrl_c() => {
240                match result {
241                    Ok(()) => {
242                        log::info!("Received SIGINT, shutting down");
243                        self.runner.stop();
244                        // Give the AsyncRunner a moment to process the shutdown signal
245                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
246                    }
247                    Err(e) => {
248                        log::error!("Failed to listen for SIGINT: {e}");
249                    }
250                }
251            }
252        }
253
254        log::debug!("AsyncRunner and signal handling finished");
255
256        self.stop().await?;
257        Ok(())
258    }
259
260    /// Gets the node's environment.
261    #[must_use]
262    pub fn environment(&self) -> Environment {
263        self.kernel.environment()
264    }
265
266    /// Gets a reference to the underlying kernel.
267    #[must_use]
268    pub const fn kernel(&self) -> &NautilusKernel {
269        &self.kernel
270    }
271
272    /// Gets an exclusive reference to the underlying kernel.
273    #[must_use]
274    pub(crate) const fn kernel_mut(&mut self) -> &mut NautilusKernel {
275        &mut self.kernel
276    }
277
278    /// Gets the node's trader ID.
279    #[must_use]
280    pub fn trader_id(&self) -> TraderId {
281        self.kernel.trader_id()
282    }
283
284    /// Gets the node's instance ID.
285    #[must_use]
286    pub const fn instance_id(&self) -> UUID4 {
287        self.kernel.instance_id()
288    }
289
290    /// Checks if the live node is currently running.
291    #[must_use]
292    pub const fn is_running(&self) -> bool {
293        self.is_running
294    }
295
296    /// Adds an actor to the trader.
297    ///
298    /// This method provides a high-level interface for adding actors to the underlying
299    /// trader without requiring direct access to the kernel. Actors should be added
300    /// after the node is built but before starting the node.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if:
305    /// - The trader is not in a valid state for adding components.
306    /// - An actor with the same ID is already registered.
307    /// - The node is currently running.
308    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
309    where
310        T: DataActor + Component + Actor + 'static,
311    {
312        if self.is_running {
313            anyhow::bail!(
314                "Cannot add actor while node is running. Add actors before calling start()."
315            );
316        }
317
318        self.kernel.trader.add_actor(actor)
319    }
320
321    pub(crate) fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
322    where
323        T: DataActor + Component + Actor + 'static,
324    {
325        if self.is_running {
326            anyhow::bail!(
327                "Cannot add actor while node is running. Add actors before calling start()."
328            );
329        }
330
331        self.kernel.trader.add_registered_actor(actor)
332    }
333
334    /// Adds an actor to the live node using a factory function.
335    ///
336    /// The factory function is called at registration time to create the actor,
337    /// avoiding cloning issues with non-cloneable actor types.
338    ///
339    /// # Errors
340    ///
341    /// Returns an error if:
342    /// - The node is currently running.
343    /// - The factory function fails to create the actor.
344    /// - The underlying trader registration fails.
345    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
346    where
347        F: FnOnce() -> anyhow::Result<T>,
348        T: DataActor + Component + Actor + 'static,
349    {
350        if self.is_running {
351            anyhow::bail!(
352                "Cannot add actor while node is running. Add actors before calling start()."
353            );
354        }
355
356        self.kernel.trader.add_actor_from_factory(factory)
357    }
358}
359
360/// Builder for constructing a [`LiveNode`] with a fluent API.
361///
362/// Provides configuration options specific to live nodes,
363/// including client factory registration and timeout settings.
364#[derive(Debug)]
365#[cfg_attr(
366    feature = "python",
367    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
368)]
369pub struct LiveNodeBuilder {
370    config: LiveNodeConfig,
371    data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
372    exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
373    data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
374    exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
375}
376
377impl LiveNodeBuilder {
378    /// Creates a new [`LiveNodeBuilder`] with required parameters.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if `environment` is invalid (BACKTEST).
383    pub fn new(
384        name: String,
385        trader_id: TraderId,
386        environment: Environment,
387    ) -> anyhow::Result<Self> {
388        match environment {
389            Environment::Sandbox | Environment::Live => {}
390            Environment::Backtest => {
391                anyhow::bail!("LiveNode cannot be used with Backtest environment");
392            }
393        }
394
395        let config = LiveNodeConfig {
396            environment,
397            trader_id,
398            ..Default::default()
399        };
400
401        Ok(Self {
402            config,
403            data_client_factories: HashMap::new(),
404            exec_client_factories: HashMap::new(),
405            data_client_configs: HashMap::new(),
406            exec_client_configs: HashMap::new(),
407        })
408    }
409
410    /// Set the instance ID for the node.
411    #[must_use]
412    pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
413        self.config.instance_id = Some(instance_id);
414        self
415    }
416
417    /// Configure whether to load state on startup.
418    #[must_use]
419    pub const fn with_load_state(mut self, load_state: bool) -> Self {
420        self.config.load_state = load_state;
421        self
422    }
423
424    /// Configure whether to save state on shutdown.
425    #[must_use]
426    pub const fn with_save_state(mut self, save_state: bool) -> Self {
427        self.config.save_state = save_state;
428        self
429    }
430
431    /// Set the connection timeout in seconds.
432    #[must_use]
433    pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
434        self.config.timeout_connection = Duration::from_secs(timeout_secs);
435        self
436    }
437
438    /// Set the reconciliation timeout in seconds.
439    #[must_use]
440    pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
441        self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
442        self
443    }
444
445    /// Set the portfolio initialization timeout in seconds.
446    #[must_use]
447    pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
448        self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
449        self
450    }
451
452    /// Set the disconnection timeout in seconds.
453    #[must_use]
454    pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
455        self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
456        self
457    }
458
459    /// Set the post-stop delay in seconds.
460    #[must_use]
461    pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
462        self.config.delay_post_stop = Duration::from_secs(delay_secs);
463        self
464    }
465
466    /// Set the shutdown timeout in seconds.
467    #[must_use]
468    pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
469        self.config.timeout_shutdown = Duration::from_secs(delay_secs);
470        self
471    }
472
473    /// Adds a data client with factory and configuration.
474    ///
475    /// # Errors
476    ///
477    /// Returns an error if a client with the same name is already registered.
478    pub fn add_data_client(
479        mut self,
480        name: Option<String>,
481        factory: Box<dyn DataClientFactory>,
482        config: Box<dyn ClientConfig>,
483    ) -> anyhow::Result<Self> {
484        let name = name.unwrap_or_else(|| factory.name().to_string());
485
486        if self.data_client_factories.contains_key(&name) {
487            anyhow::bail!("Data client '{name}' is already registered");
488        }
489
490        self.data_client_factories.insert(name.clone(), factory);
491        self.data_client_configs.insert(name, config);
492        Ok(self)
493    }
494
495    /// Adds an execution client with factory and configuration.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if a client with the same name is already registered.
500    pub fn add_exec_client(
501        mut self,
502        name: Option<String>,
503        factory: Box<dyn ExecutionClientFactory>,
504        config: Box<dyn ClientConfig>,
505    ) -> anyhow::Result<Self> {
506        let name = name.unwrap_or_else(|| factory.name().to_string());
507
508        if self.exec_client_factories.contains_key(&name) {
509            anyhow::bail!("Execution client '{name}' is already registered");
510        }
511
512        self.exec_client_factories.insert(name.clone(), factory);
513        self.exec_client_configs.insert(name, config);
514        Ok(self)
515    }
516
517    /// Build the [`LiveNode`] with the configured settings.
518    ///
519    /// This will:
520    /// 1. Build the underlying kernel.
521    /// 2. Register all client factories.
522    /// 3. Create and register all clients.
523    ///
524    /// # Errors
525    ///
526    /// Returns an error if node construction fails.
527    pub fn build(mut self) -> anyhow::Result<LiveNode> {
528        log::info!(
529            "Building LiveNode with {} data clients",
530            self.data_client_factories.len()
531        );
532
533        let runner = AsyncRunner::new();
534        let clock = Rc::new(RefCell::new(LiveClock::default()));
535        let kernel = NautilusKernel::new("LiveNode".to_string(), self.config.clone())?;
536
537        // Create and register data clients
538        for (name, factory) in self.data_client_factories {
539            if let Some(config) = self.data_client_configs.remove(&name) {
540                log::info!("Creating data client '{name}'");
541
542                let client =
543                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
544
545                log::info!("Registering data client '{name}' with data engine");
546
547                let client_id = client.client_id();
548                let venue = client.venue();
549                let adapter = DataClientAdapter::new(
550                    client_id, venue, true, // handles_order_book_deltas
551                    true, // handles_order_book_snapshots
552                    client,
553                );
554
555                kernel
556                    .data_engine
557                    .borrow_mut()
558                    .register_client(adapter, venue);
559
560                log::info!("Successfully registered data client '{name}' ({client_id})");
561            } else {
562                log::warn!("No config found for data client factory '{name}'");
563            }
564        }
565
566        // Create and register execution clients
567        for (name, factory) in self.exec_client_factories {
568            if let Some(config) = self.exec_client_configs.remove(&name) {
569                log::info!("Creating execution client '{name}'");
570
571                let client =
572                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
573
574                log::info!("Registering execution client '{name}' with execution engine");
575
576                // TODO: Implement when ExecutionEngine has a register_client method
577                // kernel.exec_engine().register_client(client);
578            } else {
579                log::warn!("No config found for execution client factory '{name}'");
580            }
581        }
582
583        log::info!("Built successfully");
584
585        Ok(LiveNode {
586            clock,
587            kernel,
588            runner,
589            config: self.config,
590            is_running: false,
591            handle: LiveNodeHandle::new(),
592        })
593    }
594}
595
596////////////////////////////////////////////////////////////////////////////////
597// Tests
598////////////////////////////////////////////////////////////////////////////////
599
600#[cfg(test)]
601mod tests {
602    use nautilus_model::identifiers::TraderId;
603    use rstest::*;
604
605    use super::*;
606
607    #[rstest]
608    fn test_trading_node_builder_creation() {
609        let result = LiveNode::builder(
610            "TestNode".to_string(),
611            TraderId::from("TRADER-001"),
612            Environment::Sandbox,
613        );
614
615        assert!(result.is_ok());
616    }
617
618    #[rstest]
619    fn test_trading_node_builder_rejects_backtest() {
620        let result = LiveNode::builder(
621            "TestNode".to_string(),
622            TraderId::from("TRADER-001"),
623            Environment::Backtest,
624        );
625
626        assert!(result.is_err());
627        assert!(
628            result
629                .unwrap_err()
630                .to_string()
631                .contains("Backtest environment")
632        );
633    }
634
635    #[rstest]
636    fn test_trading_node_builder_fluent_api() {
637        let result = LiveNode::builder(
638            "TestNode".to_string(),
639            TraderId::from("TRADER-001"),
640            Environment::Live,
641        );
642
643        assert!(result.is_ok());
644        let _builder = result
645            .unwrap()
646            .with_timeout_connection(30)
647            .with_load_state(false);
648
649        // Should not panic and methods should chain
650    }
651
652    #[rstest]
653    fn test_trading_node_build() {
654        #[cfg(feature = "python")]
655        pyo3::prepare_freethreaded_python();
656
657        let builder_result = LiveNode::builder(
658            "TestNode".to_string(),
659            TraderId::from("TRADER-001"),
660            Environment::Sandbox,
661        );
662
663        assert!(builder_result.is_ok());
664        let build_result = builder_result.unwrap().build();
665
666        assert!(build_result.is_ok());
667        let node = build_result.unwrap();
668        assert!(!node.is_running());
669        assert_eq!(node.environment(), Environment::Sandbox);
670    }
671
672    #[rstest]
673    fn test_builder_rejects_backtest_environment() {
674        let result = LiveNode::builder(
675            "TestNode".to_string(),
676            TraderId::from("TRADER-001"),
677            Environment::Backtest,
678        );
679
680        assert!(result.is_err());
681        assert!(
682            result
683                .unwrap_err()
684                .to_string()
685                .contains("Backtest environment")
686        );
687    }
688}