1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 cell::RefCell,
22 collections::HashMap,
23 rc::Rc,
24 sync::{
25 Arc,
26 atomic::{AtomicBool, Ordering},
27 },
28 time::Duration,
29};
30
31use nautilus_common::{
32 actor::{Actor, DataActor},
33 clock::LiveClock,
34 component::Component,
35 enums::Environment,
36};
37use nautilus_core::UUID4;
38use nautilus_data::client::DataClientAdapter;
39use nautilus_model::identifiers::TraderId;
40use nautilus_system::{
41 config::NautilusKernelConfig,
42 factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
43 kernel::NautilusKernel,
44};
45
46use crate::{config::LiveNodeConfig, runner::AsyncRunner};
47
48#[derive(Clone, Debug)]
52pub struct LiveNodeHandle {
53 pub(crate) stop_flag: Arc<AtomicBool>,
55 pub(crate) running_flag: Arc<AtomicBool>,
57}
58
59impl Default for LiveNodeHandle {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl LiveNodeHandle {
66 #[must_use]
68 pub fn new() -> Self {
69 Self {
70 stop_flag: Arc::new(AtomicBool::new(false)),
71 running_flag: Arc::new(AtomicBool::new(false)),
72 }
73 }
74
75 #[must_use]
77 pub fn should_stop(&self) -> bool {
78 self.stop_flag.load(Ordering::Relaxed)
79 }
80
81 #[must_use]
83 pub fn is_running(&self) -> bool {
84 self.running_flag.load(Ordering::Relaxed)
85 }
86
87 pub fn stop(&self) {
89 self.stop_flag.store(true, Ordering::Relaxed);
90 }
91
92 pub(crate) fn set_running(&self, running: bool) {
94 self.running_flag.store(running, Ordering::Relaxed);
95 if running {
96 self.stop_flag.store(false, Ordering::Relaxed);
98 }
99 }
100}
101
102#[derive(Debug)]
107#[cfg_attr(
108 feature = "python",
109 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
110)]
111pub struct LiveNode {
112 clock: Rc<RefCell<LiveClock>>,
113 kernel: NautilusKernel,
114 runner: AsyncRunner,
115 config: LiveNodeConfig,
116 is_running: bool,
117 handle: LiveNodeHandle,
119}
120
121impl LiveNode {
122 #[must_use]
124 pub fn handle(&self) -> LiveNodeHandle {
125 self.handle.clone()
126 }
127 pub fn builder(
133 name: String,
134 trader_id: TraderId,
135 environment: Environment,
136 ) -> anyhow::Result<LiveNodeBuilder> {
137 LiveNodeBuilder::new(name, trader_id, environment)
138 }
139
140 pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
150 let mut config = config.unwrap_or_default();
151 config.environment = Environment::Live;
152
153 match config.environment() {
155 Environment::Sandbox | Environment::Live => {}
156 Environment::Backtest => {
157 anyhow::bail!("LiveNode cannot be used with Backtest environment");
158 }
159 }
160
161 let runner = AsyncRunner::new();
162 let clock = Rc::new(RefCell::new(LiveClock::default()));
163 let kernel = NautilusKernel::new(name, config.clone())?;
164
165 log::info!("LiveNode built successfully with kernel config");
166
167 Ok(Self {
168 clock,
169 kernel,
170 runner,
171 config,
172 is_running: false,
173 handle: LiveNodeHandle::new(),
174 })
175 }
176
177 pub async fn start(&mut self) -> anyhow::Result<()> {
183 if self.is_running {
184 anyhow::bail!("Already running");
185 }
186
187 self.kernel.start_async().await;
188 self.is_running = true;
189 self.handle.set_running(true);
190
191 Ok(())
192 }
193
194 pub async fn stop(&mut self) -> anyhow::Result<()> {
200 if !self.is_running {
201 anyhow::bail!("Not running");
202 }
203
204 self.kernel.stop_async().await;
205 self.is_running = false;
206 self.handle.set_running(false);
207
208 Ok(())
209 }
210
211 pub async fn run(&mut self) -> anyhow::Result<()> {
220 self.start().await?;
221
222 tokio::select! {
223 () = self.runner.run() => {
225 log::info!("AsyncRunner finished");
226 }
227 () = async {
229 while !self.handle.should_stop() {
230 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
231 }
232 log::info!("Received stop signal from handle");
233 } => {
234 self.runner.stop();
235 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
237 }
238 result = tokio::signal::ctrl_c() => {
240 match result {
241 Ok(()) => {
242 log::info!("Received SIGINT, shutting down");
243 self.runner.stop();
244 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
246 }
247 Err(e) => {
248 log::error!("Failed to listen for SIGINT: {e}");
249 }
250 }
251 }
252 }
253
254 log::debug!("AsyncRunner and signal handling finished");
255
256 self.stop().await?;
257 Ok(())
258 }
259
260 #[must_use]
262 pub fn environment(&self) -> Environment {
263 self.kernel.environment()
264 }
265
266 #[must_use]
268 pub const fn kernel(&self) -> &NautilusKernel {
269 &self.kernel
270 }
271
272 #[must_use]
274 pub(crate) const fn kernel_mut(&mut self) -> &mut NautilusKernel {
275 &mut self.kernel
276 }
277
278 #[must_use]
280 pub fn trader_id(&self) -> TraderId {
281 self.kernel.trader_id()
282 }
283
284 #[must_use]
286 pub const fn instance_id(&self) -> UUID4 {
287 self.kernel.instance_id()
288 }
289
290 #[must_use]
292 pub const fn is_running(&self) -> bool {
293 self.is_running
294 }
295
296 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
309 where
310 T: DataActor + Component + Actor + 'static,
311 {
312 if self.is_running {
313 anyhow::bail!(
314 "Cannot add actor while node is running. Add actors before calling start()."
315 );
316 }
317
318 self.kernel.trader.add_actor(actor)
319 }
320
321 pub(crate) fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
322 where
323 T: DataActor + Component + Actor + 'static,
324 {
325 if self.is_running {
326 anyhow::bail!(
327 "Cannot add actor while node is running. Add actors before calling start()."
328 );
329 }
330
331 self.kernel.trader.add_registered_actor(actor)
332 }
333
334 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
346 where
347 F: FnOnce() -> anyhow::Result<T>,
348 T: DataActor + Component + Actor + 'static,
349 {
350 if self.is_running {
351 anyhow::bail!(
352 "Cannot add actor while node is running. Add actors before calling start()."
353 );
354 }
355
356 self.kernel.trader.add_actor_from_factory(factory)
357 }
358}
359
360#[derive(Debug)]
365#[cfg_attr(
366 feature = "python",
367 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
368)]
369pub struct LiveNodeBuilder {
370 config: LiveNodeConfig,
371 data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
372 exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
373 data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
374 exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
375}
376
377impl LiveNodeBuilder {
378 pub fn new(
384 name: String,
385 trader_id: TraderId,
386 environment: Environment,
387 ) -> anyhow::Result<Self> {
388 match environment {
389 Environment::Sandbox | Environment::Live => {}
390 Environment::Backtest => {
391 anyhow::bail!("LiveNode cannot be used with Backtest environment");
392 }
393 }
394
395 let config = LiveNodeConfig {
396 environment,
397 trader_id,
398 ..Default::default()
399 };
400
401 Ok(Self {
402 config,
403 data_client_factories: HashMap::new(),
404 exec_client_factories: HashMap::new(),
405 data_client_configs: HashMap::new(),
406 exec_client_configs: HashMap::new(),
407 })
408 }
409
410 #[must_use]
412 pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
413 self.config.instance_id = Some(instance_id);
414 self
415 }
416
417 #[must_use]
419 pub const fn with_load_state(mut self, load_state: bool) -> Self {
420 self.config.load_state = load_state;
421 self
422 }
423
424 #[must_use]
426 pub const fn with_save_state(mut self, save_state: bool) -> Self {
427 self.config.save_state = save_state;
428 self
429 }
430
431 #[must_use]
433 pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
434 self.config.timeout_connection = Duration::from_secs(timeout_secs);
435 self
436 }
437
438 #[must_use]
440 pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
441 self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
442 self
443 }
444
445 #[must_use]
447 pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
448 self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
449 self
450 }
451
452 #[must_use]
454 pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
455 self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
456 self
457 }
458
459 #[must_use]
461 pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
462 self.config.delay_post_stop = Duration::from_secs(delay_secs);
463 self
464 }
465
466 #[must_use]
468 pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
469 self.config.timeout_shutdown = Duration::from_secs(delay_secs);
470 self
471 }
472
473 pub fn add_data_client(
479 mut self,
480 name: Option<String>,
481 factory: Box<dyn DataClientFactory>,
482 config: Box<dyn ClientConfig>,
483 ) -> anyhow::Result<Self> {
484 let name = name.unwrap_or_else(|| factory.name().to_string());
485
486 if self.data_client_factories.contains_key(&name) {
487 anyhow::bail!("Data client '{name}' is already registered");
488 }
489
490 self.data_client_factories.insert(name.clone(), factory);
491 self.data_client_configs.insert(name, config);
492 Ok(self)
493 }
494
495 pub fn add_exec_client(
501 mut self,
502 name: Option<String>,
503 factory: Box<dyn ExecutionClientFactory>,
504 config: Box<dyn ClientConfig>,
505 ) -> anyhow::Result<Self> {
506 let name = name.unwrap_or_else(|| factory.name().to_string());
507
508 if self.exec_client_factories.contains_key(&name) {
509 anyhow::bail!("Execution client '{name}' is already registered");
510 }
511
512 self.exec_client_factories.insert(name.clone(), factory);
513 self.exec_client_configs.insert(name, config);
514 Ok(self)
515 }
516
517 pub fn build(mut self) -> anyhow::Result<LiveNode> {
528 log::info!(
529 "Building LiveNode with {} data clients",
530 self.data_client_factories.len()
531 );
532
533 let runner = AsyncRunner::new();
534 let clock = Rc::new(RefCell::new(LiveClock::default()));
535 let kernel = NautilusKernel::new("LiveNode".to_string(), self.config.clone())?;
536
537 for (name, factory) in self.data_client_factories {
539 if let Some(config) = self.data_client_configs.remove(&name) {
540 log::info!("Creating data client '{name}'");
541
542 let client =
543 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
544
545 log::info!("Registering data client '{name}' with data engine");
546
547 let client_id = client.client_id();
548 let venue = client.venue();
549 let adapter = DataClientAdapter::new(
550 client_id, venue, true, true, client,
553 );
554
555 kernel
556 .data_engine
557 .borrow_mut()
558 .register_client(adapter, venue);
559
560 log::info!("Successfully registered data client '{name}' ({client_id})");
561 } else {
562 log::warn!("No config found for data client factory '{name}'");
563 }
564 }
565
566 for (name, factory) in self.exec_client_factories {
568 if let Some(config) = self.exec_client_configs.remove(&name) {
569 log::info!("Creating execution client '{name}'");
570
571 let client =
572 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
573
574 log::info!("Registering execution client '{name}' with execution engine");
575
576 } else {
579 log::warn!("No config found for execution client factory '{name}'");
580 }
581 }
582
583 log::info!("Built successfully");
584
585 Ok(LiveNode {
586 clock,
587 kernel,
588 runner,
589 config: self.config,
590 is_running: false,
591 handle: LiveNodeHandle::new(),
592 })
593 }
594}
595
596#[cfg(test)]
601mod tests {
602 use nautilus_model::identifiers::TraderId;
603 use rstest::*;
604
605 use super::*;
606
607 #[rstest]
608 fn test_trading_node_builder_creation() {
609 let result = LiveNode::builder(
610 "TestNode".to_string(),
611 TraderId::from("TRADER-001"),
612 Environment::Sandbox,
613 );
614
615 assert!(result.is_ok());
616 }
617
618 #[rstest]
619 fn test_trading_node_builder_rejects_backtest() {
620 let result = LiveNode::builder(
621 "TestNode".to_string(),
622 TraderId::from("TRADER-001"),
623 Environment::Backtest,
624 );
625
626 assert!(result.is_err());
627 assert!(
628 result
629 .unwrap_err()
630 .to_string()
631 .contains("Backtest environment")
632 );
633 }
634
635 #[rstest]
636 fn test_trading_node_builder_fluent_api() {
637 let result = LiveNode::builder(
638 "TestNode".to_string(),
639 TraderId::from("TRADER-001"),
640 Environment::Live,
641 );
642
643 assert!(result.is_ok());
644 let _builder = result
645 .unwrap()
646 .with_timeout_connection(30)
647 .with_load_state(false);
648
649 }
651
652 #[rstest]
653 fn test_trading_node_build() {
654 #[cfg(feature = "python")]
655 pyo3::prepare_freethreaded_python();
656
657 let builder_result = LiveNode::builder(
658 "TestNode".to_string(),
659 TraderId::from("TRADER-001"),
660 Environment::Sandbox,
661 );
662
663 assert!(builder_result.is_ok());
664 let build_result = builder_result.unwrap().build();
665
666 assert!(build_result.is_ok());
667 let node = build_result.unwrap();
668 assert!(!node.is_running());
669 assert_eq!(node.environment(), Environment::Sandbox);
670 }
671
672 #[rstest]
673 fn test_builder_rejects_backtest_environment() {
674 let result = LiveNode::builder(
675 "TestNode".to_string(),
676 TraderId::from("TRADER-001"),
677 Environment::Backtest,
678 );
679
680 assert!(result.is_err());
681 assert!(
682 result
683 .unwrap_err()
684 .to_string()
685 .contains("Backtest environment")
686 );
687 }
688}