1use std::{
17 collections::HashMap,
18 fmt::Debug,
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23 time::{Duration, Instant},
24};
25
26use nautilus_common::{
27 actor::{Actor, DataActor},
28 component::Component,
29 enums::Environment,
30 messages::{DataEvent, ExecutionEvent, data::DataCommand, execution::TradingCommand},
31 timer::TimeEventHandlerV2,
32};
33use nautilus_core::UUID4;
34use nautilus_data::client::DataClientAdapter;
35use nautilus_model::identifiers::TraderId;
36use nautilus_system::{
37 config::NautilusKernelConfig,
38 factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
39 kernel::NautilusKernel,
40};
41use nautilus_trading::strategy::Strategy;
42
43use crate::{
44 config::LiveNodeConfig,
45 runner::{AsyncRunner, AsyncRunnerChannels},
46};
47
48#[derive(Clone, Debug)]
52pub struct LiveNodeHandle {
53 pub(crate) stop_flag: Arc<AtomicBool>,
55 pub(crate) running_flag: Arc<AtomicBool>,
57}
58
59impl Default for LiveNodeHandle {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl LiveNodeHandle {
66 #[must_use]
68 pub fn new() -> Self {
69 Self {
70 stop_flag: Arc::new(AtomicBool::new(false)),
71 running_flag: Arc::new(AtomicBool::new(false)),
72 }
73 }
74
75 #[must_use]
77 pub fn should_stop(&self) -> bool {
78 self.stop_flag.load(Ordering::Relaxed)
79 }
80
81 #[must_use]
83 pub fn is_running(&self) -> bool {
84 self.running_flag.load(Ordering::Relaxed)
85 }
86
87 pub fn stop(&self) {
89 self.stop_flag.store(true, Ordering::Relaxed);
90 }
91
92 pub(crate) fn set_running(&self, running: bool) {
94 self.running_flag.store(running, Ordering::Relaxed);
95 if running {
96 self.stop_flag.store(false, Ordering::Relaxed);
98 }
99 }
100}
101
102#[derive(Debug)]
107#[cfg_attr(
108 feature = "python",
109 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
110)]
111pub struct LiveNode {
112 kernel: NautilusKernel,
113 runner: Option<AsyncRunner>,
114 config: LiveNodeConfig,
115 is_running: bool,
116 handle: LiveNodeHandle,
117 #[cfg(feature = "python")]
118 #[allow(dead_code)] python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
120}
121
122impl LiveNode {
123 #[must_use]
125 pub fn handle(&self) -> LiveNodeHandle {
126 self.handle.clone()
127 }
128
129 pub fn builder(
135 trader_id: TraderId,
136 environment: Environment,
137 ) -> anyhow::Result<LiveNodeBuilder> {
138 LiveNodeBuilder::new(trader_id, environment)
139 }
140
141 pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
151 let mut config = config.unwrap_or_default();
152 config.environment = Environment::Live;
153
154 match config.environment() {
155 Environment::Sandbox | Environment::Live => {}
156 Environment::Backtest => {
157 anyhow::bail!("LiveNode cannot be used with Backtest environment");
158 }
159 }
160
161 let runner = AsyncRunner::new();
162 let kernel = NautilusKernel::new(name, config.clone())?;
163
164 log::info!("LiveNode built successfully with kernel config");
165
166 Ok(Self {
167 kernel,
168 runner: Some(runner),
169 config,
170 is_running: false,
171 handle: LiveNodeHandle::new(),
172 #[cfg(feature = "python")]
173 python_actors: Vec::new(),
174 })
175 }
176
177 pub async fn start(&mut self) -> anyhow::Result<()> {
183 if self.is_running {
184 anyhow::bail!("Already running");
185 }
186
187 self.kernel.start_async().await;
188 self.kernel.connect_clients().await?;
189 self.await_engines_connected().await?;
190
191 self.is_running = true;
192 self.handle.set_running(true);
193
194 Ok(())
195 }
196
197 pub async fn stop(&mut self) -> anyhow::Result<()> {
203 if !self.is_running {
204 anyhow::bail!("Not running");
205 }
206
207 self.kernel.stop_async().await;
208 self.kernel.disconnect_clients().await?;
209 self.await_engines_disconnected().await?;
210
211 self.is_running = false;
212 self.handle.set_running(false);
213
214 Ok(())
215 }
216
217 async fn await_engines_connected(&self) -> anyhow::Result<()> {
219 let start = Instant::now();
220 let timeout = self.config.timeout_connection;
221 let interval = Duration::from_millis(100);
222
223 while start.elapsed() < timeout {
224 if self.kernel.check_engines_connected() {
225 log::info!("All engine clients connected");
226 return Ok(());
227 }
228 tokio::time::sleep(interval).await;
229 }
230
231 anyhow::bail!("Timeout waiting for engine clients to connect after {timeout:?}")
232 }
233
234 async fn await_engines_disconnected(&self) -> anyhow::Result<()> {
236 let start = Instant::now();
237 let timeout = self.config.timeout_disconnection;
238 let interval = Duration::from_millis(100);
239
240 while start.elapsed() < timeout {
241 if self.kernel.check_engines_disconnected() {
242 log::info!("All engine clients disconnected");
243 return Ok(());
244 }
245 tokio::time::sleep(interval).await;
246 }
247
248 anyhow::bail!("Timeout waiting for engine clients to disconnect after {timeout:?}")
249 }
250
251 pub async fn run(&mut self) -> anyhow::Result<()> {
272 if self.runner.is_none() {
273 anyhow::bail!("Runner already consumed - run() called twice");
274 }
275
276 self.start().await?;
277
278 let Some(runner) = self.runner.take() else {
280 unreachable!("Runner was verified to exist before start()")
281 };
282
283 let AsyncRunnerChannels {
284 mut time_evt_rx,
285 mut data_evt_rx,
286 mut data_cmd_rx,
287 mut exec_evt_rx,
288 mut exec_cmd_rx,
289 } = runner.take_channels();
290
291 log::info!("Event loop starting");
292
293 loop {
294 tokio::select! {
295 result = tokio::signal::ctrl_c() => {
296 match result {
297 Ok(()) => log::info!("Received SIGINT, shutting down"),
298 Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
299 }
300 break;
301 }
302 Some(handler) = time_evt_rx.recv() => {
303 AsyncRunner::handle_time_event(handler);
304 }
305 Some(cmd) = data_cmd_rx.recv() => {
306 AsyncRunner::handle_data_command(cmd);
307 }
308 Some(evt) = data_evt_rx.recv() => {
309 AsyncRunner::handle_data_event(evt);
310 }
311 Some(cmd) = exec_cmd_rx.recv() => {
312 AsyncRunner::handle_exec_command(cmd);
313 }
314 Some(evt) = exec_evt_rx.recv() => {
315 AsyncRunner::handle_exec_event(evt);
316 }
317 () = async {
318 loop {
319 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
320 if self.handle.should_stop() {
321 log::info!("Received stop signal from handle");
322 return;
323 }
324 }
325 } => {
326 break;
327 }
328 }
329 }
330
331 self.stop().await?;
332
333 self.drain_channels(
334 &mut time_evt_rx,
335 &mut data_evt_rx,
336 &mut data_cmd_rx,
337 &mut exec_evt_rx,
338 &mut exec_cmd_rx,
339 );
340
341 log::info!("Event loop stopped");
342
343 Ok(())
344 }
345
346 fn drain_channels(
347 &self,
348 time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
349 data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
350 data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
351 exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
352 exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
353 ) {
354 let mut drained = 0;
355
356 while let Ok(handler) = time_evt_rx.try_recv() {
357 AsyncRunner::handle_time_event(handler);
358 drained += 1;
359 }
360 while let Ok(cmd) = data_cmd_rx.try_recv() {
361 AsyncRunner::handle_data_command(cmd);
362 drained += 1;
363 }
364 while let Ok(evt) = data_evt_rx.try_recv() {
365 AsyncRunner::handle_data_event(evt);
366 drained += 1;
367 }
368 while let Ok(cmd) = exec_cmd_rx.try_recv() {
369 AsyncRunner::handle_exec_command(cmd);
370 drained += 1;
371 }
372 while let Ok(evt) = exec_evt_rx.try_recv() {
373 AsyncRunner::handle_exec_event(evt);
374 drained += 1;
375 }
376
377 if drained > 0 {
378 log::info!("Drained {drained} remaining events during shutdown");
379 }
380 }
381
382 #[must_use]
384 pub fn environment(&self) -> Environment {
385 self.kernel.environment()
386 }
387
388 #[must_use]
390 pub const fn kernel(&self) -> &NautilusKernel {
391 &self.kernel
392 }
393
394 #[must_use]
396 pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
397 &mut self.kernel
398 }
399
400 #[must_use]
402 pub fn trader_id(&self) -> TraderId {
403 self.kernel.trader_id()
404 }
405
406 #[must_use]
408 pub const fn instance_id(&self) -> UUID4 {
409 self.kernel.instance_id()
410 }
411
412 #[must_use]
414 pub const fn is_running(&self) -> bool {
415 self.is_running
416 }
417
418 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
431 where
432 T: DataActor + Component + Actor + 'static,
433 {
434 if self.is_running {
435 anyhow::bail!(
436 "Cannot add actor while node is running. Add actors before calling start()."
437 );
438 }
439
440 self.kernel.trader.add_actor(actor)
441 }
442
443 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
455 where
456 F: FnOnce() -> anyhow::Result<T>,
457 T: DataActor + Component + Actor + 'static,
458 {
459 if self.is_running {
460 anyhow::bail!(
461 "Cannot add actor while node is running, add actors before calling start()"
462 );
463 }
464
465 self.kernel.trader.add_actor_from_factory(factory)
466 }
467
468 pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
479 where
480 T: Strategy + Component + Debug + 'static,
481 {
482 if self.is_running {
483 anyhow::bail!(
484 "Cannot add strategy while node is running, add strategies before calling start()"
485 );
486 }
487
488 self.kernel.trader.add_strategy(strategy)
489 }
490}
491
492#[derive(Debug)]
497#[cfg_attr(
498 feature = "python",
499 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
500)]
501pub struct LiveNodeBuilder {
502 name: String,
503 config: LiveNodeConfig,
504 data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
505 exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
506 data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
507 exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
508}
509
510impl LiveNodeBuilder {
511 pub fn new(trader_id: TraderId, environment: Environment) -> anyhow::Result<Self> {
517 match environment {
518 Environment::Sandbox | Environment::Live => {}
519 Environment::Backtest => {
520 anyhow::bail!("LiveNode cannot be used with Backtest environment");
521 }
522 }
523
524 let config = LiveNodeConfig {
525 environment,
526 trader_id,
527 ..Default::default()
528 };
529
530 Ok(Self {
531 name: "LiveNode".to_string(),
532 config,
533 data_client_factories: HashMap::new(),
534 exec_client_factories: HashMap::new(),
535 data_client_configs: HashMap::new(),
536 exec_client_configs: HashMap::new(),
537 })
538 }
539
540 #[must_use]
542 pub fn name(&self) -> &str {
543 &self.name
544 }
545
546 #[must_use]
548 pub fn with_name(mut self, name: impl Into<String>) -> Self {
549 self.name = name.into();
550 self
551 }
552
553 #[must_use]
555 pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
556 self.config.instance_id = Some(instance_id);
557 self
558 }
559
560 #[must_use]
562 pub const fn with_load_state(mut self, load_state: bool) -> Self {
563 self.config.load_state = load_state;
564 self
565 }
566
567 #[must_use]
569 pub const fn with_save_state(mut self, save_state: bool) -> Self {
570 self.config.save_state = save_state;
571 self
572 }
573
574 #[must_use]
576 pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
577 self.config.timeout_connection = Duration::from_secs(timeout_secs);
578 self
579 }
580
581 #[must_use]
583 pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
584 self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
585 self
586 }
587
588 #[must_use]
590 pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
591 self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
592 self
593 }
594
595 #[must_use]
597 pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
598 self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
599 self
600 }
601
602 #[must_use]
604 pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
605 self.config.delay_post_stop = Duration::from_secs(delay_secs);
606 self
607 }
608
609 #[must_use]
611 pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
612 self.config.timeout_shutdown = Duration::from_secs(delay_secs);
613 self
614 }
615
616 pub fn add_data_client(
622 mut self,
623 name: Option<String>,
624 factory: Box<dyn DataClientFactory>,
625 config: Box<dyn ClientConfig>,
626 ) -> anyhow::Result<Self> {
627 let name = name.unwrap_or_else(|| factory.name().to_string());
628
629 if self.data_client_factories.contains_key(&name) {
630 anyhow::bail!("Data client '{name}' is already registered");
631 }
632
633 self.data_client_factories.insert(name.clone(), factory);
634 self.data_client_configs.insert(name, config);
635 Ok(self)
636 }
637
638 pub fn add_exec_client(
644 mut self,
645 name: Option<String>,
646 factory: Box<dyn ExecutionClientFactory>,
647 config: Box<dyn ClientConfig>,
648 ) -> anyhow::Result<Self> {
649 let name = name.unwrap_or_else(|| factory.name().to_string());
650
651 if self.exec_client_factories.contains_key(&name) {
652 anyhow::bail!("Execution client '{name}' is already registered");
653 }
654
655 self.exec_client_factories.insert(name.clone(), factory);
656 self.exec_client_configs.insert(name, config);
657 Ok(self)
658 }
659
660 pub fn build(mut self) -> anyhow::Result<LiveNode> {
671 log::info!(
672 "Building LiveNode with {} data clients and {} execution clients",
673 self.data_client_factories.len(),
674 self.exec_client_factories.len()
675 );
676
677 let runner = AsyncRunner::new();
679 let kernel = NautilusKernel::new(self.name.clone(), self.config.clone())?;
680
681 for (name, factory) in self.data_client_factories {
683 if let Some(config) = self.data_client_configs.remove(&name) {
684 log::info!("Creating data client '{name}'");
685
686 let client =
687 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
688 let client_id = client.client_id();
689 let venue = client.venue();
690
691 let adapter = DataClientAdapter::new(
692 client_id, venue, true, true, client,
695 );
696
697 kernel
698 .data_engine
699 .borrow_mut()
700 .register_client(adapter, venue);
701
702 log::info!("Registered data client '{name}' ({client_id})");
703 } else {
704 log::warn!("No config found for data client factory '{name}'");
705 }
706 }
707
708 for (name, factory) in self.exec_client_factories {
710 if let Some(config) = self.exec_client_configs.remove(&name) {
711 log::info!("Creating execution client '{name}'");
712
713 let client =
714 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
715 let client_id = client.client_id();
716
717 kernel.exec_engine.borrow_mut().register_client(client)?;
718
719 log::info!("Registered execution client '{name}' ({client_id})");
720 } else {
721 log::warn!("No config found for execution client factory '{name}'");
722 }
723 }
724
725 log::info!("Built successfully");
726
727 Ok(LiveNode {
728 kernel,
729 runner: Some(runner),
730 config: self.config,
731 is_running: false,
732 handle: LiveNodeHandle::new(),
733 #[cfg(feature = "python")]
734 python_actors: Vec::new(),
735 })
736 }
737}
738
739#[cfg(test)]
740mod tests {
741 use nautilus_model::identifiers::TraderId;
742 use rstest::*;
743
744 use super::*;
745
746 #[rstest]
747 fn test_trading_node_builder_creation() {
748 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
749
750 assert!(result.is_ok());
751 }
752
753 #[rstest]
754 fn test_trading_node_builder_rejects_backtest() {
755 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
756
757 assert!(result.is_err());
758 assert!(
759 result
760 .unwrap_err()
761 .to_string()
762 .contains("Backtest environment")
763 );
764 }
765
766 #[rstest]
767 fn test_trading_node_builder_fluent_api() {
768 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
769
770 assert!(result.is_ok());
771 let _builder = result
772 .unwrap()
773 .with_name("TestNode")
774 .with_timeout_connection(30)
775 .with_load_state(false);
776
777 }
779
780 #[cfg(feature = "python")]
781 #[rstest]
782 fn test_trading_node_build() {
783 let builder_result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
784
785 assert!(builder_result.is_ok());
786 let build_result = builder_result.unwrap().with_name("TestNode").build();
787
788 assert!(build_result.is_ok());
789 let node = build_result.unwrap();
790 assert!(!node.is_running());
791 assert_eq!(node.environment(), Environment::Sandbox);
792 }
793
794 #[rstest]
795 fn test_builder_rejects_backtest_environment() {
796 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
797
798 assert!(result.is_err());
799 assert!(
800 result
801 .unwrap_err()
802 .to_string()
803 .contains("Backtest environment")
804 );
805 }
806}