1use std::{
17 fmt::Debug,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21 },
22 time::{Duration, Instant},
23};
24
25use nautilus_common::{
26 actor::{Actor, DataActor},
27 component::Component,
28 enums::{Environment, LogColor},
29 log_info,
30 messages::{DataEvent, ExecutionEvent, data::DataCommand, execution::TradingCommand},
31 timer::TimeEventHandlerV2,
32};
33use nautilus_core::UUID4;
34use nautilus_model::{
35 events::OrderEventAny,
36 identifiers::{StrategyId, TraderId},
37};
38use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
39use nautilus_trading::strategy::Strategy;
40
41use crate::{
42 builder::LiveNodeBuilder,
43 config::LiveNodeConfig,
44 manager::{ExecutionManager, ExecutionManagerConfig},
45 runner::{AsyncRunner, AsyncRunnerChannels},
46};
47
48#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
50#[repr(u8)]
51pub enum NodeState {
52 #[default]
53 Idle = 0,
54 Starting = 1,
55 Running = 2,
56 ShuttingDown = 3,
57 Stopped = 4,
58}
59
60impl NodeState {
61 #[must_use]
67 pub const fn from_u8(value: u8) -> Self {
68 match value {
69 0 => Self::Idle,
70 1 => Self::Starting,
71 2 => Self::Running,
72 3 => Self::ShuttingDown,
73 4 => Self::Stopped,
74 _ => panic!("Invalid NodeState value"),
75 }
76 }
77
78 #[must_use]
80 pub const fn as_u8(self) -> u8 {
81 self as u8
82 }
83
84 #[must_use]
86 pub const fn is_running(&self) -> bool {
87 matches!(self, Self::Running)
88 }
89}
90
91#[derive(Clone, Debug)]
96pub struct LiveNodeHandle {
97 pub(crate) stop_flag: Arc<AtomicBool>,
99 pub(crate) state: Arc<AtomicU8>,
101}
102
103impl Default for LiveNodeHandle {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109impl LiveNodeHandle {
110 #[must_use]
112 pub fn new() -> Self {
113 Self {
114 stop_flag: Arc::new(AtomicBool::new(false)),
115 state: Arc::new(AtomicU8::new(NodeState::Idle.as_u8())),
116 }
117 }
118
119 pub(crate) fn set_state(&self, state: NodeState) {
121 self.state.store(state.as_u8(), Ordering::Relaxed);
122 if state == NodeState::Running {
123 self.stop_flag.store(false, Ordering::Relaxed);
125 }
126 }
127
128 #[must_use]
130 pub fn state(&self) -> NodeState {
131 NodeState::from_u8(self.state.load(Ordering::Relaxed))
132 }
133
134 #[must_use]
136 pub fn should_stop(&self) -> bool {
137 self.stop_flag.load(Ordering::Relaxed)
138 }
139
140 #[must_use]
142 pub fn is_running(&self) -> bool {
143 self.state().is_running()
144 }
145
146 pub fn stop(&self) {
148 self.stop_flag.store(true, Ordering::Relaxed);
149 }
150}
151
152#[derive(Debug)]
157#[cfg_attr(
158 feature = "python",
159 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
160)]
161pub struct LiveNode {
162 kernel: NautilusKernel,
163 runner: Option<AsyncRunner>,
164 config: LiveNodeConfig,
165 handle: LiveNodeHandle,
166 exec_manager: ExecutionManager,
167 shutdown_deadline: Option<tokio::time::Instant>,
168 #[cfg(feature = "python")]
169 #[allow(dead_code)] python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
171}
172
173impl LiveNode {
174 #[must_use]
178 pub(crate) fn new_from_builder(
179 kernel: NautilusKernel,
180 runner: AsyncRunner,
181 config: LiveNodeConfig,
182 exec_manager: ExecutionManager,
183 ) -> Self {
184 Self {
185 kernel,
186 runner: Some(runner),
187 config,
188 handle: LiveNodeHandle::new(),
189 exec_manager,
190 shutdown_deadline: None,
191 #[cfg(feature = "python")]
192 python_actors: Vec::new(),
193 }
194 }
195
196 pub fn builder(
202 trader_id: TraderId,
203 environment: Environment,
204 ) -> anyhow::Result<LiveNodeBuilder> {
205 LiveNodeBuilder::new(trader_id, environment)
206 }
207
208 pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
218 let mut config = config.unwrap_or_default();
219 config.environment = Environment::Live;
220
221 match config.environment() {
222 Environment::Sandbox | Environment::Live => {}
223 Environment::Backtest => {
224 anyhow::bail!("LiveNode cannot be used with Backtest environment");
225 }
226 }
227
228 let runner = AsyncRunner::new();
229 let kernel = NautilusKernel::new(name, config.clone())?;
230
231 let exec_manager_config =
232 ExecutionManagerConfig::from(&config.exec_engine).with_trader_id(config.trader_id);
233 let exec_manager = ExecutionManager::new(
234 kernel.clock.clone(),
235 kernel.cache.clone(),
236 exec_manager_config,
237 );
238
239 log::info!("LiveNode built successfully with kernel config");
240
241 Ok(Self {
242 kernel,
243 runner: Some(runner),
244 config,
245 handle: LiveNodeHandle::new(),
246 exec_manager,
247 shutdown_deadline: None,
248 #[cfg(feature = "python")]
249 python_actors: Vec::new(),
250 })
251 }
252
253 #[must_use]
255 pub fn handle(&self) -> LiveNodeHandle {
256 self.handle.clone()
257 }
258
259 pub async fn start(&mut self) -> anyhow::Result<()> {
265 if self.state().is_running() {
266 anyhow::bail!("Already running");
267 }
268
269 self.handle.set_state(NodeState::Starting);
270
271 self.kernel.start_async().await;
272 self.kernel.connect_clients().await?;
273 self.await_engines_connected().await?;
274 self.perform_startup_reconciliation().await?;
275
276 self.handle.set_state(NodeState::Running);
277
278 Ok(())
279 }
280
281 pub async fn stop(&mut self) -> anyhow::Result<()> {
290 if !self.state().is_running() {
291 anyhow::bail!("Not running");
292 }
293
294 self.handle.set_state(NodeState::ShuttingDown);
295
296 self.kernel.stop_trader();
297 let delay = self.kernel.delay_post_stop();
298 log::info!("Awaiting residual events ({delay:?})...");
299
300 tokio::time::sleep(delay).await;
301 self.finalize_stop().await
302 }
303
304 async fn await_engines_connected(&self) -> anyhow::Result<()> {
306 let start = Instant::now();
307 let timeout = self.config.timeout_connection;
308 let interval = Duration::from_millis(100);
309
310 while start.elapsed() < timeout {
311 if self.kernel.check_engines_connected() {
312 log::info!("All engine clients connected");
313 return Ok(());
314 }
315 tokio::time::sleep(interval).await;
316 }
317
318 anyhow::bail!("Timeout waiting for engine clients to connect after {timeout:?}")
319 }
320
321 async fn await_engines_disconnected(&self) -> anyhow::Result<()> {
323 let start = Instant::now();
324 let timeout = self.config.timeout_disconnection;
325 let interval = Duration::from_millis(100);
326
327 while start.elapsed() < timeout {
328 if self.kernel.check_engines_disconnected() {
329 log::info!("All engine clients disconnected");
330 return Ok(());
331 }
332 tokio::time::sleep(interval).await;
333 }
334
335 anyhow::bail!("Timeout waiting for engine clients to disconnect after {timeout:?}")
336 }
337
338 #[allow(clippy::await_holding_refcell_ref)] async fn perform_startup_reconciliation(&mut self) -> anyhow::Result<()> {
348 if !self.config.exec_engine.reconciliation {
349 log::info!("Startup reconciliation disabled");
350 return Ok(());
351 }
352
353 log_info!(
354 "Starting execution state reconciliation...",
355 color = LogColor::Blue
356 );
357
358 let lookback_mins = self
359 .config
360 .exec_engine
361 .reconciliation_lookback_mins
362 .map(|m| m as u64);
363
364 let timeout = self.config.timeout_reconciliation;
365 let start = Instant::now();
366 let client_ids = self.kernel.exec_engine.borrow().client_ids();
367
368 for client_id in client_ids {
369 if start.elapsed() > timeout {
370 log::warn!("Reconciliation timeout reached, stopping early");
371 break;
372 }
373
374 log_info!(
375 "Requesting mass status from {}...",
376 client_id,
377 color = LogColor::Blue
378 );
379
380 let mass_status_result = self
381 .kernel
382 .exec_engine
383 .borrow_mut()
384 .generate_mass_status(&client_id, lookback_mins)
385 .await;
386
387 match mass_status_result {
388 Ok(Some(mass_status)) => {
389 log_info!(
390 "Reconciling ExecutionMassStatus for {}",
391 client_id,
392 color = LogColor::Blue
393 );
394 let events = self
395 .exec_manager
396 .reconcile_execution_mass_status(mass_status)
397 .await;
398
399 if events.is_empty() {
400 log_info!(
401 "Reconciliation for {} succeeded",
402 client_id,
403 color = LogColor::Blue
404 );
405 } else {
406 log::info!(
407 color = LogColor::Blue as u8;
408 "Reconciliation for {} generated {} events",
409 client_id,
410 events.len()
411 );
412
413 let mut exec_engine = self.kernel.exec_engine.borrow_mut();
414 for event in events {
415 exec_engine.process(&event);
416 }
417 }
418 }
419 Ok(None) => {
420 log::warn!(
421 "No mass status available from {client_id} \
422 (likely adapter error when generating reports)"
423 );
424 }
425 Err(e) => {
426 log::warn!("Failed to get mass status from {client_id}: {e}");
427 }
428 }
429 }
430
431 self.kernel.portfolio.borrow_mut().initialize_orders();
432 self.kernel.portfolio.borrow_mut().initialize_positions();
433
434 let elapsed_secs = start.elapsed().as_secs_f64();
435 log_info!(
436 "Startup reconciliation completed in {:.2}s",
437 elapsed_secs,
438 color = LogColor::Blue
439 );
440
441 Ok(())
442 }
443
444 pub async fn run(&mut self) -> anyhow::Result<()> {
466 if self.state().is_running() {
467 anyhow::bail!("Already running");
468 }
469
470 let Some(runner) = self.runner.take() else {
471 anyhow::bail!("Runner already consumed - run() called twice");
472 };
473
474 let AsyncRunnerChannels {
475 mut time_evt_rx,
476 mut data_evt_rx,
477 mut data_cmd_rx,
478 mut exec_evt_rx,
479 mut exec_cmd_rx,
480 } = runner.take_channels();
481
482 log::info!("Event loop starting");
483
484 self.handle.set_state(NodeState::Starting);
485 self.kernel.start_async().await;
486
487 let stop_handle = self.handle.clone();
488 let mut pending = PendingEvents::default();
489
490 {
495 let startup_future = self.complete_startup();
496 tokio::pin!(startup_future);
497
498 loop {
499 tokio::select! {
500 biased;
501
502 result = &mut startup_future => {
503 result?;
504 break;
506 }
507 Some(handler) = time_evt_rx.recv() => {
508 AsyncRunner::handle_time_event(handler);
509 }
510 Some(evt) = data_evt_rx.recv() => {
511 pending.data_evts.push(evt);
512 }
513 Some(cmd) = data_cmd_rx.recv() => {
514 pending.data_cmds.push(cmd);
515 }
516 Some(evt) = exec_evt_rx.recv() => {
517 match evt {
519 ExecutionEvent::Account(_) | ExecutionEvent::Report(_) => {
520 AsyncRunner::handle_exec_event(evt);
521 }
522 ExecutionEvent::Order(order_evt) => {
523 pending.order_evts.push(order_evt);
524 }
525 }
526 }
527 Some(cmd) = exec_cmd_rx.recv() => {
528 pending.exec_cmds.push(cmd);
529 }
530 }
531 }
532 }
533
534 pending.drain();
535
536 let mut residual_events = 0usize;
538
539 loop {
540 let shutdown_deadline = self.shutdown_deadline;
541 let is_shutting_down = self.state() == NodeState::ShuttingDown;
542
543 tokio::select! {
544 Some(handler) = time_evt_rx.recv() => {
545 AsyncRunner::handle_time_event(handler);
546 if is_shutting_down {
547 log::debug!("Residual time event");
548 residual_events += 1;
549 }
550 }
551 Some(evt) = data_evt_rx.recv() => {
552 if is_shutting_down {
553 log::debug!("Residual data event: {evt:?}");
554 residual_events += 1;
555 }
556 AsyncRunner::handle_data_event(evt);
557 }
558 Some(cmd) = data_cmd_rx.recv() => {
559 if is_shutting_down {
560 log::debug!("Residual data command: {cmd:?}");
561 residual_events += 1;
562 }
563 AsyncRunner::handle_data_command(cmd);
564 }
565 Some(evt) = exec_evt_rx.recv() => {
566 if is_shutting_down {
567 log::debug!("Residual exec event: {evt:?}");
568 residual_events += 1;
569 }
570 AsyncRunner::handle_exec_event(evt);
571 }
572 Some(cmd) = exec_cmd_rx.recv() => {
573 if is_shutting_down {
574 log::debug!("Residual exec command: {cmd:?}");
575 residual_events += 1;
576 }
577 AsyncRunner::handle_exec_command(cmd);
578 }
579 result = tokio::signal::ctrl_c(), if self.state() == NodeState::Running => {
580 match result {
581 Ok(()) => log::info!("Received SIGINT, shutting down"),
582 Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
583 }
584 self.initiate_shutdown();
585 }
586 () = async {
587 loop {
588 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
589 if stop_handle.should_stop() {
590 log::info!("Received stop signal from handle");
591 return;
592 }
593 }
594 }, if self.state() == NodeState::Running => {
595 self.initiate_shutdown();
596 }
597 () = async {
598 match shutdown_deadline {
599 Some(deadline) => tokio::time::sleep_until(deadline).await,
600 None => std::future::pending::<()>().await,
601 }
602 }, if self.state() == NodeState::ShuttingDown => {
603 break;
604 }
605 }
606 }
607
608 if residual_events > 0 {
609 log::debug!("Processed {residual_events} residual events during shutdown");
610 }
611
612 let _ = self.kernel.cache().borrow().check_residuals();
613
614 self.finalize_stop().await?;
615
616 self.drain_channels(
618 &mut time_evt_rx,
619 &mut data_evt_rx,
620 &mut data_cmd_rx,
621 &mut exec_evt_rx,
622 &mut exec_cmd_rx,
623 );
624
625 log::info!("Event loop stopped");
626
627 Ok(())
628 }
629
630 async fn complete_startup(&mut self) -> anyhow::Result<()> {
631 self.kernel.connect_clients().await?;
632 self.await_engines_connected().await?;
633 self.perform_startup_reconciliation().await?;
634
635 self.handle.set_state(NodeState::Running);
636
637 Ok(())
638 }
639
640 fn initiate_shutdown(&mut self) {
641 self.kernel.stop_trader();
642 let delay = self.kernel.delay_post_stop();
643 log::info!("Awaiting residual events ({delay:?})...");
644
645 self.shutdown_deadline = Some(tokio::time::Instant::now() + delay);
646 self.handle.set_state(NodeState::ShuttingDown);
647 }
648
649 async fn finalize_stop(&mut self) -> anyhow::Result<()> {
650 self.kernel.disconnect_clients().await?;
651 self.await_engines_disconnected().await?;
652 self.kernel.finalize_stop().await;
653
654 self.handle.set_state(NodeState::Stopped);
655
656 Ok(())
657 }
658
659 fn drain_channels(
660 &self,
661 time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
662 data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
663 data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
664 exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
665 exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
666 ) {
667 let mut drained = 0;
668
669 while let Ok(handler) = time_evt_rx.try_recv() {
670 AsyncRunner::handle_time_event(handler);
671 drained += 1;
672 }
673 while let Ok(cmd) = data_cmd_rx.try_recv() {
674 AsyncRunner::handle_data_command(cmd);
675 drained += 1;
676 }
677 while let Ok(evt) = data_evt_rx.try_recv() {
678 AsyncRunner::handle_data_event(evt);
679 drained += 1;
680 }
681 while let Ok(cmd) = exec_cmd_rx.try_recv() {
682 AsyncRunner::handle_exec_command(cmd);
683 drained += 1;
684 }
685 while let Ok(evt) = exec_evt_rx.try_recv() {
686 AsyncRunner::handle_exec_event(evt);
687 drained += 1;
688 }
689
690 if drained > 0 {
691 log::info!("Drained {drained} remaining events during shutdown");
692 }
693 }
694
695 #[must_use]
697 pub fn environment(&self) -> Environment {
698 self.kernel.environment()
699 }
700
701 #[must_use]
703 pub const fn kernel(&self) -> &NautilusKernel {
704 &self.kernel
705 }
706
707 #[must_use]
709 pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
710 &mut self.kernel
711 }
712
713 #[must_use]
715 pub fn trader_id(&self) -> TraderId {
716 self.kernel.trader_id()
717 }
718
719 #[must_use]
721 pub const fn instance_id(&self) -> UUID4 {
722 self.kernel.instance_id()
723 }
724
725 #[must_use]
727 pub fn state(&self) -> NodeState {
728 self.handle.state()
729 }
730
731 #[must_use]
733 pub fn is_running(&self) -> bool {
734 self.state().is_running()
735 }
736
737 #[must_use]
739 pub const fn exec_manager(&self) -> &ExecutionManager {
740 &self.exec_manager
741 }
742
743 #[must_use]
745 pub fn exec_manager_mut(&mut self) -> &mut ExecutionManager {
746 &mut self.exec_manager
747 }
748
749 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
762 where
763 T: DataActor + Component + Actor + 'static,
764 {
765 if self.state() != NodeState::Idle {
766 anyhow::bail!(
767 "Cannot add actor while node is running, add actors before calling start()"
768 );
769 }
770
771 self.kernel.trader.add_actor(actor)
772 }
773
774 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
786 where
787 F: FnOnce() -> anyhow::Result<T>,
788 T: DataActor + Component + Actor + 'static,
789 {
790 if self.state() != NodeState::Idle {
791 anyhow::bail!(
792 "Cannot add actor while node is running, add actors before calling start()"
793 );
794 }
795
796 self.kernel.trader.add_actor_from_factory(factory)
797 }
798
799 pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
810 where
811 T: Strategy + Component + Debug + 'static,
812 {
813 if self.state() != NodeState::Idle {
814 anyhow::bail!(
815 "Cannot add strategy while node is running, add strategies before calling start()"
816 );
817 }
818
819 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
821 if let Some(claims) = strategy.external_order_claims() {
822 for instrument_id in claims {
823 self.exec_manager
824 .claim_external_orders(instrument_id, strategy_id);
825 }
826 log_info!(
827 "Registered external order claims for {}: {:?}",
828 strategy_id,
829 strategy.external_order_claims(),
830 color = LogColor::Blue
831 );
832 }
833
834 self.kernel.trader.add_strategy(strategy)
835 }
836}
837
838#[derive(Default)]
844struct PendingEvents {
845 data_cmds: Vec<DataCommand>,
846 data_evts: Vec<DataEvent>,
847 exec_cmds: Vec<TradingCommand>,
848 order_evts: Vec<OrderEventAny>,
849}
850
851impl PendingEvents {
852 fn drain(&mut self) {
853 let total = self.data_evts.len()
854 + self.data_cmds.len()
855 + self.exec_cmds.len()
856 + self.order_evts.len();
857
858 if total > 0 {
859 log::debug!(
860 "Processing {total} events/commands queued during startup \
861 (data_evts={}, data_cmds={}, exec_cmds={}, order_evts={})",
862 self.data_evts.len(),
863 self.data_cmds.len(),
864 self.exec_cmds.len(),
865 self.order_evts.len()
866 );
867 }
868
869 for evt in self.data_evts.drain(..) {
870 AsyncRunner::handle_data_event(evt);
871 }
872 for cmd in self.data_cmds.drain(..) {
873 AsyncRunner::handle_data_command(cmd);
874 }
875 for cmd in self.exec_cmds.drain(..) {
876 AsyncRunner::handle_exec_command(cmd);
877 }
878 for evt in self.order_evts.drain(..) {
879 AsyncRunner::handle_exec_event(ExecutionEvent::Order(evt));
880 }
881 }
882}
883
884#[cfg(test)]
885mod tests {
886 use nautilus_model::identifiers::TraderId;
887 use rstest::*;
888
889 use super::*;
890
891 #[rstest]
892 #[case(0, NodeState::Idle)]
893 #[case(1, NodeState::Starting)]
894 #[case(2, NodeState::Running)]
895 #[case(3, NodeState::ShuttingDown)]
896 #[case(4, NodeState::Stopped)]
897 fn test_node_state_from_u8_valid(#[case] value: u8, #[case] expected: NodeState) {
898 assert_eq!(NodeState::from_u8(value), expected);
899 }
900
901 #[rstest]
902 #[case(5)]
903 #[case(255)]
904 #[should_panic(expected = "Invalid NodeState value")]
905 fn test_node_state_from_u8_invalid_panics(#[case] value: u8) {
906 let _ = NodeState::from_u8(value);
907 }
908
909 #[rstest]
910 fn test_node_state_roundtrip() {
911 for state in [
912 NodeState::Idle,
913 NodeState::Starting,
914 NodeState::Running,
915 NodeState::ShuttingDown,
916 NodeState::Stopped,
917 ] {
918 assert_eq!(NodeState::from_u8(state.as_u8()), state);
919 }
920 }
921
922 #[rstest]
923 fn test_node_state_is_running_only_for_running() {
924 assert!(!NodeState::Idle.is_running());
925 assert!(!NodeState::Starting.is_running());
926 assert!(NodeState::Running.is_running());
927 assert!(!NodeState::ShuttingDown.is_running());
928 assert!(!NodeState::Stopped.is_running());
929 }
930
931 #[rstest]
932 fn test_handle_initial_state() {
933 let handle = LiveNodeHandle::new();
934
935 assert_eq!(handle.state(), NodeState::Idle);
936 assert!(!handle.should_stop());
937 assert!(!handle.is_running());
938 }
939
940 #[rstest]
941 fn test_handle_stop_sets_flag() {
942 let handle = LiveNodeHandle::new();
943
944 handle.stop();
945
946 assert!(handle.should_stop());
947 }
948
949 #[rstest]
950 fn test_handle_set_state_running_clears_stop_flag() {
951 let handle = LiveNodeHandle::new();
952 handle.stop();
953 assert!(handle.should_stop());
954
955 handle.set_state(NodeState::Running);
956
957 assert!(!handle.should_stop());
958 assert!(handle.is_running());
959 assert_eq!(handle.state(), NodeState::Running);
960 }
961
962 #[rstest]
963 fn test_handle_node_state_transitions() {
964 let handle = LiveNodeHandle::new();
965 assert_eq!(handle.state(), NodeState::Idle);
966
967 handle.set_state(NodeState::Starting);
968 assert_eq!(handle.state(), NodeState::Starting);
969 assert!(!handle.is_running());
970
971 handle.set_state(NodeState::Running);
972 assert_eq!(handle.state(), NodeState::Running);
973 assert!(handle.is_running());
974
975 handle.set_state(NodeState::ShuttingDown);
976 assert_eq!(handle.state(), NodeState::ShuttingDown);
977 assert!(!handle.is_running());
978
979 handle.set_state(NodeState::Stopped);
980 assert_eq!(handle.state(), NodeState::Stopped);
981 assert!(!handle.is_running());
982 }
983
984 #[rstest]
985 fn test_handle_clone_shares_state_bidirectionally() {
986 let handle1 = LiveNodeHandle::new();
987 let handle2 = handle1.clone();
988
989 handle1.stop();
991 assert!(handle2.should_stop());
992
993 handle2.set_state(NodeState::Running);
995 assert_eq!(handle1.state(), NodeState::Running);
996 }
997
998 #[rstest]
999 fn test_handle_stop_flag_independent_of_state() {
1000 let handle = LiveNodeHandle::new();
1001
1002 handle.set_state(NodeState::Starting);
1004 handle.stop();
1005 assert!(handle.should_stop());
1006 assert_eq!(handle.state(), NodeState::Starting);
1007
1008 handle.set_state(NodeState::ShuttingDown);
1010 assert!(handle.should_stop()); handle.set_state(NodeState::Running);
1013 assert!(!handle.should_stop()); }
1015
1016 #[rstest]
1017 fn test_builder_creation() {
1018 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1019
1020 assert!(result.is_ok());
1021 }
1022
1023 #[rstest]
1024 fn test_builder_rejects_backtest() {
1025 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
1026
1027 assert!(result.is_err());
1028 assert!(result.unwrap_err().to_string().contains("Backtest"));
1029 }
1030
1031 #[rstest]
1032 fn test_builder_accepts_live_environment() {
1033 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
1034
1035 assert!(result.is_ok());
1036 }
1037
1038 #[rstest]
1039 fn test_builder_accepts_sandbox_environment() {
1040 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1041
1042 assert!(result.is_ok());
1043 }
1044
1045 #[rstest]
1046 fn test_builder_fluent_api_chaining() {
1047 let builder = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live)
1048 .unwrap()
1049 .with_name("TestNode")
1050 .with_instance_id(UUID4::new())
1051 .with_load_state(false)
1052 .with_save_state(true)
1053 .with_timeout_connection(30)
1054 .with_timeout_reconciliation(60)
1055 .with_reconciliation(true)
1056 .with_reconciliation_lookback_mins(120)
1057 .with_timeout_portfolio(10)
1058 .with_timeout_disconnection_secs(5)
1059 .with_delay_post_stop_secs(3)
1060 .with_delay_shutdown_secs(10);
1061
1062 assert_eq!(builder.name(), "TestNode");
1063 }
1064
1065 #[cfg(feature = "python")]
1066 #[rstest]
1067 fn test_node_build_and_initial_state() {
1068 let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1069 .unwrap()
1070 .with_name("TestNode")
1071 .build()
1072 .unwrap();
1073
1074 assert_eq!(node.state(), NodeState::Idle);
1075 assert!(!node.is_running());
1076 assert_eq!(node.environment(), Environment::Sandbox);
1077 assert_eq!(node.trader_id(), TraderId::from("TRADER-001"));
1078 }
1079
1080 #[cfg(feature = "python")]
1081 #[rstest]
1082 fn test_node_handle_reflects_node_state() {
1083 let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1084 .unwrap()
1085 .with_name("TestNode")
1086 .build()
1087 .unwrap();
1088
1089 let handle = node.handle();
1090
1091 assert_eq!(handle.state(), NodeState::Idle);
1092 assert!(!handle.is_running());
1093 }
1094}