1use std::{
17 fmt::Debug,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21 },
22 time::{Duration, Instant},
23};
24
25use nautilus_common::{
26 actor::{Actor, DataActor},
27 cache::database::CacheDatabaseAdapter,
28 component::Component,
29 enums::{Environment, LogColor},
30 log_info,
31 messages::{DataEvent, ExecutionEvent, data::DataCommand, execution::TradingCommand},
32 timer::TimeEventHandler,
33};
34use nautilus_core::UUID4;
35use nautilus_model::{
36 events::OrderEventAny,
37 identifiers::{StrategyId, TraderId},
38};
39use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
40use nautilus_trading::strategy::Strategy;
41
42use crate::{
43 builder::LiveNodeBuilder,
44 config::LiveNodeConfig,
45 manager::{ExecutionManager, ExecutionManagerConfig},
46 runner::{AsyncRunner, AsyncRunnerChannels},
47};
48
49#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
51#[repr(u8)]
52pub enum NodeState {
53 #[default]
54 Idle = 0,
55 Starting = 1,
56 Running = 2,
57 ShuttingDown = 3,
58 Stopped = 4,
59}
60
61impl NodeState {
62 #[must_use]
68 pub const fn from_u8(value: u8) -> Self {
69 match value {
70 0 => Self::Idle,
71 1 => Self::Starting,
72 2 => Self::Running,
73 3 => Self::ShuttingDown,
74 4 => Self::Stopped,
75 _ => panic!("Invalid NodeState value"),
76 }
77 }
78
79 #[must_use]
81 pub const fn as_u8(self) -> u8 {
82 self as u8
83 }
84
85 #[must_use]
87 pub const fn is_running(&self) -> bool {
88 matches!(self, Self::Running)
89 }
90}
91
92#[derive(Clone, Debug)]
97pub struct LiveNodeHandle {
98 pub(crate) stop_flag: Arc<AtomicBool>,
100 pub(crate) state: Arc<AtomicU8>,
102}
103
104impl Default for LiveNodeHandle {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110impl LiveNodeHandle {
111 #[must_use]
113 pub fn new() -> Self {
114 Self {
115 stop_flag: Arc::new(AtomicBool::new(false)),
116 state: Arc::new(AtomicU8::new(NodeState::Idle.as_u8())),
117 }
118 }
119
120 pub(crate) fn set_state(&self, state: NodeState) {
122 self.state.store(state.as_u8(), Ordering::Relaxed);
123 if state == NodeState::Running {
124 self.stop_flag.store(false, Ordering::Relaxed);
126 }
127 }
128
129 #[must_use]
131 pub fn state(&self) -> NodeState {
132 NodeState::from_u8(self.state.load(Ordering::Relaxed))
133 }
134
135 #[must_use]
137 pub fn should_stop(&self) -> bool {
138 self.stop_flag.load(Ordering::Relaxed)
139 }
140
141 #[must_use]
143 pub fn is_running(&self) -> bool {
144 self.state().is_running()
145 }
146
147 pub fn stop(&self) {
149 self.stop_flag.store(true, Ordering::Relaxed);
150 }
151}
152
153#[derive(Debug)]
158#[cfg_attr(
159 feature = "python",
160 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
161)]
162pub struct LiveNode {
163 kernel: NautilusKernel,
164 runner: Option<AsyncRunner>,
165 config: LiveNodeConfig,
166 handle: LiveNodeHandle,
167 exec_manager: ExecutionManager,
168 shutdown_deadline: Option<tokio::time::Instant>,
169 #[cfg(feature = "python")]
170 #[allow(dead_code)] python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
172}
173
174impl LiveNode {
175 #[must_use]
179 pub(crate) fn new_from_builder(
180 kernel: NautilusKernel,
181 runner: AsyncRunner,
182 config: LiveNodeConfig,
183 exec_manager: ExecutionManager,
184 ) -> Self {
185 Self {
186 kernel,
187 runner: Some(runner),
188 config,
189 handle: LiveNodeHandle::new(),
190 exec_manager,
191 shutdown_deadline: None,
192 #[cfg(feature = "python")]
193 python_actors: Vec::new(),
194 }
195 }
196
197 pub fn builder(
203 trader_id: TraderId,
204 environment: Environment,
205 ) -> anyhow::Result<LiveNodeBuilder> {
206 LiveNodeBuilder::new(trader_id, environment)
207 }
208
209 pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
219 let mut config = config.unwrap_or_default();
220 config.environment = Environment::Live;
221
222 match config.environment() {
223 Environment::Sandbox | Environment::Live => {}
224 Environment::Backtest => {
225 anyhow::bail!("LiveNode cannot be used with Backtest environment");
226 }
227 }
228
229 let runner = AsyncRunner::new();
230 let kernel = NautilusKernel::new(name, config.clone())?;
231
232 let exec_manager_config =
233 ExecutionManagerConfig::from(&config.exec_engine).with_trader_id(config.trader_id);
234 let exec_manager = ExecutionManager::new(
235 kernel.clock.clone(),
236 kernel.cache.clone(),
237 exec_manager_config,
238 );
239
240 log::info!("LiveNode built successfully with kernel config");
241
242 Ok(Self {
243 kernel,
244 runner: Some(runner),
245 config,
246 handle: LiveNodeHandle::new(),
247 exec_manager,
248 shutdown_deadline: None,
249 #[cfg(feature = "python")]
250 python_actors: Vec::new(),
251 })
252 }
253
254 #[must_use]
256 pub fn handle(&self) -> LiveNodeHandle {
257 self.handle.clone()
258 }
259
260 pub async fn start(&mut self) -> anyhow::Result<()> {
266 if self.state().is_running() {
267 anyhow::bail!("Already running");
268 }
269
270 self.handle.set_state(NodeState::Starting);
271
272 self.kernel.start_async().await;
273 self.kernel.connect_clients().await?;
274 self.await_engines_connected().await?;
275
276 if let Some(runner) = self.runner.as_mut() {
278 runner.drain_pending_data_events();
279 }
280
281 self.perform_startup_reconciliation().await?;
282
283 self.kernel.start_trader();
284
285 self.handle.set_state(NodeState::Running);
286
287 Ok(())
288 }
289
290 pub async fn stop(&mut self) -> anyhow::Result<()> {
299 if !self.state().is_running() {
300 anyhow::bail!("Not running");
301 }
302
303 self.handle.set_state(NodeState::ShuttingDown);
304
305 self.kernel.stop_trader();
306 let delay = self.kernel.delay_post_stop();
307 log::info!("Awaiting residual events ({delay:?})...");
308
309 tokio::time::sleep(delay).await;
310 self.finalize_stop().await
311 }
312
313 async fn await_engines_connected(&self) -> anyhow::Result<()> {
315 let start = Instant::now();
316 let timeout = self.config.timeout_connection;
317 let interval = Duration::from_millis(100);
318
319 while start.elapsed() < timeout {
320 if self.kernel.check_engines_connected() {
321 log::info!("All engine clients connected");
322 return Ok(());
323 }
324 tokio::time::sleep(interval).await;
325 }
326
327 anyhow::bail!("Timeout waiting for engine clients to connect after {timeout:?}")
328 }
329
330 async fn await_engines_disconnected(&self) -> anyhow::Result<()> {
332 let start = Instant::now();
333 let timeout = self.config.timeout_disconnection;
334 let interval = Duration::from_millis(100);
335
336 while start.elapsed() < timeout {
337 if self.kernel.check_engines_disconnected() {
338 log::info!("All engine clients disconnected");
339 return Ok(());
340 }
341 tokio::time::sleep(interval).await;
342 }
343
344 anyhow::bail!("Timeout waiting for engine clients to disconnect after {timeout:?}")
345 }
346
347 #[allow(clippy::await_holding_refcell_ref)] async fn perform_startup_reconciliation(&mut self) -> anyhow::Result<()> {
357 if !self.config.exec_engine.reconciliation {
358 log::info!("Startup reconciliation disabled");
359 return Ok(());
360 }
361
362 log_info!(
363 "Starting execution state reconciliation...",
364 color = LogColor::Blue
365 );
366
367 let lookback_mins = self
368 .config
369 .exec_engine
370 .reconciliation_lookback_mins
371 .map(|m| m as u64);
372
373 let timeout = self.config.timeout_reconciliation;
374 let start = Instant::now();
375 let client_ids = self.kernel.exec_engine.borrow().client_ids();
376
377 for client_id in client_ids {
378 if start.elapsed() > timeout {
379 log::warn!("Reconciliation timeout reached, stopping early");
380 break;
381 }
382
383 log_info!(
384 "Requesting mass status from {}...",
385 client_id,
386 color = LogColor::Blue
387 );
388
389 let mass_status_result = self
390 .kernel
391 .exec_engine
392 .borrow_mut()
393 .generate_mass_status(&client_id, lookback_mins)
394 .await;
395
396 match mass_status_result {
397 Ok(Some(mass_status)) => {
398 log_info!(
399 "Reconciling ExecutionMassStatus for {}",
400 client_id,
401 color = LogColor::Blue
402 );
403 let events = self
404 .exec_manager
405 .reconcile_execution_mass_status(mass_status)
406 .await;
407
408 if events.is_empty() {
409 log_info!(
410 "Reconciliation for {} succeeded",
411 client_id,
412 color = LogColor::Blue
413 );
414 } else {
415 log::info!(
416 color = LogColor::Blue as u8;
417 "Reconciliation for {} generated {} events",
418 client_id,
419 events.len()
420 );
421
422 let mut exec_engine = self.kernel.exec_engine.borrow_mut();
423 for event in events {
424 exec_engine.process(&event);
425 }
426 }
427 }
428 Ok(None) => {
429 log::warn!(
430 "No mass status available from {client_id} \
431 (likely adapter error when generating reports)"
432 );
433 }
434 Err(e) => {
435 log::warn!("Failed to get mass status from {client_id}: {e}");
436 }
437 }
438 }
439
440 self.kernel.portfolio.borrow_mut().initialize_orders();
441 self.kernel.portfolio.borrow_mut().initialize_positions();
442
443 let elapsed_secs = start.elapsed().as_secs_f64();
444 log_info!(
445 "Startup reconciliation completed in {:.2}s",
446 elapsed_secs,
447 color = LogColor::Blue
448 );
449
450 Ok(())
451 }
452
453 pub async fn run(&mut self) -> anyhow::Result<()> {
475 if self.state().is_running() {
476 anyhow::bail!("Already running");
477 }
478
479 let Some(runner) = self.runner.take() else {
480 anyhow::bail!("Runner already consumed - run() called twice");
481 };
482
483 let AsyncRunnerChannels {
484 mut time_evt_rx,
485 mut data_evt_rx,
486 mut data_cmd_rx,
487 mut exec_evt_rx,
488 mut exec_cmd_rx,
489 } = runner.take_channels();
490
491 log::info!("Event loop starting");
492
493 self.handle.set_state(NodeState::Starting);
494 self.kernel.start_async().await;
495
496 let stop_handle = self.handle.clone();
497 let mut pending = PendingEvents::default();
498
499 {
504 let startup_future = self.complete_startup();
505 tokio::pin!(startup_future);
506
507 loop {
508 tokio::select! {
509 biased;
510
511 result = &mut startup_future => {
512 result?;
513 break;
514 }
515 Some(handler) = time_evt_rx.recv() => {
516 AsyncRunner::handle_time_event(handler);
517 }
518 Some(evt) = data_evt_rx.recv() => {
519 pending.data_evts.push(evt);
520 }
521 Some(cmd) = data_cmd_rx.recv() => {
522 pending.data_cmds.push(cmd);
523 }
524 Some(evt) = exec_evt_rx.recv() => {
525 match evt {
527 ExecutionEvent::Account(_) | ExecutionEvent::Report(_) => {
528 AsyncRunner::handle_exec_event(evt);
529 }
530 ExecutionEvent::Order(order_evt) => {
531 pending.order_evts.push(order_evt);
532 }
533 }
534 }
535 Some(cmd) = exec_cmd_rx.recv() => {
536 pending.exec_cmds.push(cmd);
537 }
538 }
539 }
540 }
541
542 pending.drain();
543
544 self.kernel.start_trader();
546 self.handle.set_state(NodeState::Running);
547
548 let mut residual_events = 0usize;
550
551 loop {
552 let shutdown_deadline = self.shutdown_deadline;
553 let is_shutting_down = self.state() == NodeState::ShuttingDown;
554
555 tokio::select! {
556 Some(handler) = time_evt_rx.recv() => {
557 AsyncRunner::handle_time_event(handler);
558 if is_shutting_down {
559 log::debug!("Residual time event");
560 residual_events += 1;
561 }
562 }
563 Some(evt) = data_evt_rx.recv() => {
564 if is_shutting_down {
565 log::debug!("Residual data event: {evt:?}");
566 residual_events += 1;
567 }
568 AsyncRunner::handle_data_event(evt);
569 }
570 Some(cmd) = data_cmd_rx.recv() => {
571 if is_shutting_down {
572 log::debug!("Residual data command: {cmd:?}");
573 residual_events += 1;
574 }
575 AsyncRunner::handle_data_command(cmd);
576 }
577 Some(evt) = exec_evt_rx.recv() => {
578 if is_shutting_down {
579 log::debug!("Residual exec event: {evt:?}");
580 residual_events += 1;
581 }
582 AsyncRunner::handle_exec_event(evt);
583 }
584 Some(cmd) = exec_cmd_rx.recv() => {
585 if is_shutting_down {
586 log::debug!("Residual exec command: {cmd:?}");
587 residual_events += 1;
588 }
589 AsyncRunner::handle_exec_command(cmd);
590 }
591 result = tokio::signal::ctrl_c(), if self.state() == NodeState::Running => {
592 match result {
593 Ok(()) => log::info!("Received SIGINT, shutting down"),
594 Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
595 }
596 self.initiate_shutdown();
597 }
598 () = async {
599 loop {
600 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
601 if stop_handle.should_stop() {
602 log::info!("Received stop signal from handle");
603 return;
604 }
605 }
606 }, if self.state() == NodeState::Running => {
607 self.initiate_shutdown();
608 }
609 () = async {
610 match shutdown_deadline {
611 Some(deadline) => tokio::time::sleep_until(deadline).await,
612 None => std::future::pending::<()>().await,
613 }
614 }, if self.state() == NodeState::ShuttingDown => {
615 break;
616 }
617 }
618 }
619
620 if residual_events > 0 {
621 log::debug!("Processed {residual_events} residual events during shutdown");
622 }
623
624 let _ = self.kernel.cache().borrow().check_residuals();
625
626 self.finalize_stop().await?;
627
628 self.drain_channels(
630 &mut time_evt_rx,
631 &mut data_evt_rx,
632 &mut data_cmd_rx,
633 &mut exec_evt_rx,
634 &mut exec_cmd_rx,
635 );
636
637 log::info!("Event loop stopped");
638
639 Ok(())
640 }
641
642 async fn complete_startup(&mut self) -> anyhow::Result<()> {
643 self.kernel.connect_clients().await?;
644 self.await_engines_connected().await?;
645 self.perform_startup_reconciliation().await?;
646 Ok(())
647 }
648
649 fn initiate_shutdown(&mut self) {
650 self.kernel.stop_trader();
651 let delay = self.kernel.delay_post_stop();
652 log::info!("Awaiting residual events ({delay:?})...");
653
654 self.shutdown_deadline = Some(tokio::time::Instant::now() + delay);
655 self.handle.set_state(NodeState::ShuttingDown);
656 }
657
658 async fn finalize_stop(&mut self) -> anyhow::Result<()> {
659 self.kernel.disconnect_clients().await?;
660 self.await_engines_disconnected().await?;
661 self.kernel.finalize_stop().await;
662
663 self.handle.set_state(NodeState::Stopped);
664
665 Ok(())
666 }
667
668 fn drain_channels(
669 &self,
670 time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
671 data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
672 data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
673 exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
674 exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
675 ) {
676 let mut drained = 0;
677
678 while let Ok(handler) = time_evt_rx.try_recv() {
679 AsyncRunner::handle_time_event(handler);
680 drained += 1;
681 }
682 while let Ok(cmd) = data_cmd_rx.try_recv() {
683 AsyncRunner::handle_data_command(cmd);
684 drained += 1;
685 }
686 while let Ok(evt) = data_evt_rx.try_recv() {
687 AsyncRunner::handle_data_event(evt);
688 drained += 1;
689 }
690 while let Ok(cmd) = exec_cmd_rx.try_recv() {
691 AsyncRunner::handle_exec_command(cmd);
692 drained += 1;
693 }
694 while let Ok(evt) = exec_evt_rx.try_recv() {
695 AsyncRunner::handle_exec_event(evt);
696 drained += 1;
697 }
698
699 if drained > 0 {
700 log::info!("Drained {drained} remaining events during shutdown");
701 }
702 }
703
704 #[must_use]
706 pub fn environment(&self) -> Environment {
707 self.kernel.environment()
708 }
709
710 #[must_use]
712 pub const fn kernel(&self) -> &NautilusKernel {
713 &self.kernel
714 }
715
716 #[must_use]
718 pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
719 &mut self.kernel
720 }
721
722 #[must_use]
724 pub fn trader_id(&self) -> TraderId {
725 self.kernel.trader_id()
726 }
727
728 #[must_use]
730 pub const fn instance_id(&self) -> UUID4 {
731 self.kernel.instance_id()
732 }
733
734 #[must_use]
736 pub fn state(&self) -> NodeState {
737 self.handle.state()
738 }
739
740 #[must_use]
742 pub fn is_running(&self) -> bool {
743 self.state().is_running()
744 }
745
746 pub fn set_cache_database(
756 &mut self,
757 database: Box<dyn CacheDatabaseAdapter>,
758 ) -> anyhow::Result<()> {
759 if self.state() != NodeState::Idle {
760 anyhow::bail!(
761 "Cannot set cache database while node is running, set it before calling start()"
762 );
763 }
764
765 self.kernel.cache().borrow_mut().set_database(database);
766 Ok(())
767 }
768
769 #[must_use]
771 pub const fn exec_manager(&self) -> &ExecutionManager {
772 &self.exec_manager
773 }
774
775 #[must_use]
777 pub fn exec_manager_mut(&mut self) -> &mut ExecutionManager {
778 &mut self.exec_manager
779 }
780
781 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
794 where
795 T: DataActor + Component + Actor + 'static,
796 {
797 if self.state() != NodeState::Idle {
798 anyhow::bail!(
799 "Cannot add actor while node is running, add actors before calling start()"
800 );
801 }
802
803 self.kernel.trader.add_actor(actor)
804 }
805
806 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
818 where
819 F: FnOnce() -> anyhow::Result<T>,
820 T: DataActor + Component + Actor + 'static,
821 {
822 if self.state() != NodeState::Idle {
823 anyhow::bail!(
824 "Cannot add actor while node is running, add actors before calling start()"
825 );
826 }
827
828 self.kernel.trader.add_actor_from_factory(factory)
829 }
830
831 pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
842 where
843 T: Strategy + Component + Debug + 'static,
844 {
845 if self.state() != NodeState::Idle {
846 anyhow::bail!(
847 "Cannot add strategy while node is running, add strategies before calling start()"
848 );
849 }
850
851 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
853 if let Some(claims) = strategy.external_order_claims() {
854 for instrument_id in claims {
855 self.exec_manager
856 .claim_external_orders(instrument_id, strategy_id);
857 }
858 log_info!(
859 "Registered external order claims for {}: {:?}",
860 strategy_id,
861 strategy.external_order_claims(),
862 color = LogColor::Blue
863 );
864 }
865
866 self.kernel.trader.add_strategy(strategy)
867 }
868}
869
870#[derive(Default)]
876struct PendingEvents {
877 data_cmds: Vec<DataCommand>,
878 data_evts: Vec<DataEvent>,
879 exec_cmds: Vec<TradingCommand>,
880 order_evts: Vec<OrderEventAny>,
881}
882
883impl PendingEvents {
884 fn drain(&mut self) {
885 let total = self.data_evts.len()
886 + self.data_cmds.len()
887 + self.exec_cmds.len()
888 + self.order_evts.len();
889
890 if total > 0 {
891 log::debug!(
892 "Processing {total} events/commands queued during startup \
893 (data_evts={}, data_cmds={}, exec_cmds={}, order_evts={})",
894 self.data_evts.len(),
895 self.data_cmds.len(),
896 self.exec_cmds.len(),
897 self.order_evts.len()
898 );
899 }
900
901 for evt in self.data_evts.drain(..) {
902 AsyncRunner::handle_data_event(evt);
903 }
904 for cmd in self.data_cmds.drain(..) {
905 AsyncRunner::handle_data_command(cmd);
906 }
907 for cmd in self.exec_cmds.drain(..) {
908 AsyncRunner::handle_exec_command(cmd);
909 }
910 for evt in self.order_evts.drain(..) {
911 AsyncRunner::handle_exec_event(ExecutionEvent::Order(evt));
912 }
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use nautilus_model::identifiers::TraderId;
919 use rstest::*;
920
921 use super::*;
922
923 #[rstest]
924 #[case(0, NodeState::Idle)]
925 #[case(1, NodeState::Starting)]
926 #[case(2, NodeState::Running)]
927 #[case(3, NodeState::ShuttingDown)]
928 #[case(4, NodeState::Stopped)]
929 fn test_node_state_from_u8_valid(#[case] value: u8, #[case] expected: NodeState) {
930 assert_eq!(NodeState::from_u8(value), expected);
931 }
932
933 #[rstest]
934 #[case(5)]
935 #[case(255)]
936 #[should_panic(expected = "Invalid NodeState value")]
937 fn test_node_state_from_u8_invalid_panics(#[case] value: u8) {
938 let _ = NodeState::from_u8(value);
939 }
940
941 #[rstest]
942 fn test_node_state_roundtrip() {
943 for state in [
944 NodeState::Idle,
945 NodeState::Starting,
946 NodeState::Running,
947 NodeState::ShuttingDown,
948 NodeState::Stopped,
949 ] {
950 assert_eq!(NodeState::from_u8(state.as_u8()), state);
951 }
952 }
953
954 #[rstest]
955 fn test_node_state_is_running_only_for_running() {
956 assert!(!NodeState::Idle.is_running());
957 assert!(!NodeState::Starting.is_running());
958 assert!(NodeState::Running.is_running());
959 assert!(!NodeState::ShuttingDown.is_running());
960 assert!(!NodeState::Stopped.is_running());
961 }
962
963 #[rstest]
964 fn test_handle_initial_state() {
965 let handle = LiveNodeHandle::new();
966
967 assert_eq!(handle.state(), NodeState::Idle);
968 assert!(!handle.should_stop());
969 assert!(!handle.is_running());
970 }
971
972 #[rstest]
973 fn test_handle_stop_sets_flag() {
974 let handle = LiveNodeHandle::new();
975
976 handle.stop();
977
978 assert!(handle.should_stop());
979 }
980
981 #[rstest]
982 fn test_handle_set_state_running_clears_stop_flag() {
983 let handle = LiveNodeHandle::new();
984 handle.stop();
985 assert!(handle.should_stop());
986
987 handle.set_state(NodeState::Running);
988
989 assert!(!handle.should_stop());
990 assert!(handle.is_running());
991 assert_eq!(handle.state(), NodeState::Running);
992 }
993
994 #[rstest]
995 fn test_handle_node_state_transitions() {
996 let handle = LiveNodeHandle::new();
997 assert_eq!(handle.state(), NodeState::Idle);
998
999 handle.set_state(NodeState::Starting);
1000 assert_eq!(handle.state(), NodeState::Starting);
1001 assert!(!handle.is_running());
1002
1003 handle.set_state(NodeState::Running);
1004 assert_eq!(handle.state(), NodeState::Running);
1005 assert!(handle.is_running());
1006
1007 handle.set_state(NodeState::ShuttingDown);
1008 assert_eq!(handle.state(), NodeState::ShuttingDown);
1009 assert!(!handle.is_running());
1010
1011 handle.set_state(NodeState::Stopped);
1012 assert_eq!(handle.state(), NodeState::Stopped);
1013 assert!(!handle.is_running());
1014 }
1015
1016 #[rstest]
1017 fn test_handle_clone_shares_state_bidirectionally() {
1018 let handle1 = LiveNodeHandle::new();
1019 let handle2 = handle1.clone();
1020
1021 handle1.stop();
1023 assert!(handle2.should_stop());
1024
1025 handle2.set_state(NodeState::Running);
1027 assert_eq!(handle1.state(), NodeState::Running);
1028 }
1029
1030 #[rstest]
1031 fn test_handle_stop_flag_independent_of_state() {
1032 let handle = LiveNodeHandle::new();
1033
1034 handle.set_state(NodeState::Starting);
1036 handle.stop();
1037 assert!(handle.should_stop());
1038 assert_eq!(handle.state(), NodeState::Starting);
1039
1040 handle.set_state(NodeState::ShuttingDown);
1042 assert!(handle.should_stop()); handle.set_state(NodeState::Running);
1045 assert!(!handle.should_stop()); }
1047
1048 #[rstest]
1049 fn test_builder_creation() {
1050 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1051
1052 assert!(result.is_ok());
1053 }
1054
1055 #[rstest]
1056 fn test_builder_rejects_backtest() {
1057 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
1058
1059 assert!(result.is_err());
1060 assert!(result.unwrap_err().to_string().contains("Backtest"));
1061 }
1062
1063 #[rstest]
1064 fn test_builder_accepts_live_environment() {
1065 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
1066
1067 assert!(result.is_ok());
1068 }
1069
1070 #[rstest]
1071 fn test_builder_accepts_sandbox_environment() {
1072 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1073
1074 assert!(result.is_ok());
1075 }
1076
1077 #[rstest]
1078 fn test_builder_fluent_api_chaining() {
1079 let builder = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live)
1080 .unwrap()
1081 .with_name("TestNode")
1082 .with_instance_id(UUID4::new())
1083 .with_load_state(false)
1084 .with_save_state(true)
1085 .with_timeout_connection(30)
1086 .with_timeout_reconciliation(60)
1087 .with_reconciliation(true)
1088 .with_reconciliation_lookback_mins(120)
1089 .with_timeout_portfolio(10)
1090 .with_timeout_disconnection_secs(5)
1091 .with_delay_post_stop_secs(3)
1092 .with_delay_shutdown_secs(10);
1093
1094 assert_eq!(builder.name(), "TestNode");
1095 }
1096
1097 #[cfg(feature = "python")]
1098 #[rstest]
1099 fn test_node_build_and_initial_state() {
1100 let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1101 .unwrap()
1102 .with_name("TestNode")
1103 .build()
1104 .unwrap();
1105
1106 assert_eq!(node.state(), NodeState::Idle);
1107 assert!(!node.is_running());
1108 assert_eq!(node.environment(), Environment::Sandbox);
1109 assert_eq!(node.trader_id(), TraderId::from("TRADER-001"));
1110 }
1111
1112 #[cfg(feature = "python")]
1113 #[rstest]
1114 fn test_node_handle_reflects_node_state() {
1115 let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1116 .unwrap()
1117 .with_name("TestNode")
1118 .build()
1119 .unwrap();
1120
1121 let handle = node.handle();
1122
1123 assert_eq!(handle.state(), NodeState::Idle);
1124 assert!(!handle.is_running());
1125 }
1126}