1use std::{
17 fmt::Debug,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21 },
22 time::{Duration, Instant},
23};
24
25use nautilus_common::{
26 actor::{Actor, DataActor},
27 cache::database::CacheDatabaseAdapter,
28 component::Component,
29 enums::{Environment, LogColor},
30 log_info,
31 messages::{DataEvent, ExecutionEvent, data::DataCommand, execution::TradingCommand},
32 timer::TimeEventHandler,
33};
34use nautilus_core::UUID4;
35use nautilus_model::{
36 events::OrderEventAny,
37 identifiers::{StrategyId, TraderId},
38};
39use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
40use nautilus_trading::strategy::Strategy;
41use tabled::{Table, Tabled, settings::Style};
42
43use crate::{
44 builder::LiveNodeBuilder,
45 config::LiveNodeConfig,
46 manager::{ExecutionManager, ExecutionManagerConfig},
47 runner::{AsyncRunner, AsyncRunnerChannels},
48};
49
50#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52#[repr(u8)]
53pub enum NodeState {
54 #[default]
55 Idle = 0,
56 Starting = 1,
57 Running = 2,
58 ShuttingDown = 3,
59 Stopped = 4,
60}
61
62impl NodeState {
63 #[must_use]
69 pub const fn from_u8(value: u8) -> Self {
70 match value {
71 0 => Self::Idle,
72 1 => Self::Starting,
73 2 => Self::Running,
74 3 => Self::ShuttingDown,
75 4 => Self::Stopped,
76 _ => panic!("Invalid NodeState value"),
77 }
78 }
79
80 #[must_use]
82 pub const fn as_u8(self) -> u8 {
83 self as u8
84 }
85
86 #[must_use]
88 pub const fn is_running(&self) -> bool {
89 matches!(self, Self::Running)
90 }
91}
92
93#[derive(Clone, Debug)]
98pub struct LiveNodeHandle {
99 pub(crate) stop_flag: Arc<AtomicBool>,
101 pub(crate) state: Arc<AtomicU8>,
103}
104
105impl Default for LiveNodeHandle {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl LiveNodeHandle {
112 #[must_use]
114 pub fn new() -> Self {
115 Self {
116 stop_flag: Arc::new(AtomicBool::new(false)),
117 state: Arc::new(AtomicU8::new(NodeState::Idle.as_u8())),
118 }
119 }
120
121 pub(crate) fn set_state(&self, state: NodeState) {
123 self.state.store(state.as_u8(), Ordering::Relaxed);
124 if state == NodeState::Running {
125 self.stop_flag.store(false, Ordering::Relaxed);
127 }
128 }
129
130 #[must_use]
132 pub fn state(&self) -> NodeState {
133 NodeState::from_u8(self.state.load(Ordering::Relaxed))
134 }
135
136 #[must_use]
138 pub fn should_stop(&self) -> bool {
139 self.stop_flag.load(Ordering::Relaxed)
140 }
141
142 #[must_use]
144 pub fn is_running(&self) -> bool {
145 self.state().is_running()
146 }
147
148 pub fn stop(&self) {
150 self.stop_flag.store(true, Ordering::Relaxed);
151 }
152}
153
154#[derive(Debug)]
159#[cfg_attr(
160 feature = "python",
161 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
162)]
163pub struct LiveNode {
164 kernel: NautilusKernel,
165 runner: Option<AsyncRunner>,
166 config: LiveNodeConfig,
167 handle: LiveNodeHandle,
168 exec_manager: ExecutionManager,
169 shutdown_deadline: Option<tokio::time::Instant>,
170 #[cfg(feature = "python")]
171 #[allow(dead_code)] python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
173}
174
175impl LiveNode {
176 #[must_use]
180 pub(crate) fn new_from_builder(
181 kernel: NautilusKernel,
182 runner: AsyncRunner,
183 config: LiveNodeConfig,
184 exec_manager: ExecutionManager,
185 ) -> Self {
186 Self {
187 kernel,
188 runner: Some(runner),
189 config,
190 handle: LiveNodeHandle::new(),
191 exec_manager,
192 shutdown_deadline: None,
193 #[cfg(feature = "python")]
194 python_actors: Vec::new(),
195 }
196 }
197
198 pub fn builder(
204 trader_id: TraderId,
205 environment: Environment,
206 ) -> anyhow::Result<LiveNodeBuilder> {
207 LiveNodeBuilder::new(trader_id, environment)
208 }
209
210 pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
220 let mut config = config.unwrap_or_default();
221 config.environment = Environment::Live;
222
223 match config.environment() {
224 Environment::Sandbox | Environment::Live => {}
225 Environment::Backtest => {
226 anyhow::bail!("LiveNode cannot be used with Backtest environment");
227 }
228 }
229
230 let runner = AsyncRunner::new();
231 let kernel = NautilusKernel::new(name, config.clone())?;
232
233 let exec_manager_config =
234 ExecutionManagerConfig::from(&config.exec_engine).with_trader_id(config.trader_id);
235 let exec_manager = ExecutionManager::new(
236 kernel.clock.clone(),
237 kernel.cache.clone(),
238 exec_manager_config,
239 );
240
241 log::info!("LiveNode built successfully with kernel config");
242
243 Ok(Self {
244 kernel,
245 runner: Some(runner),
246 config,
247 handle: LiveNodeHandle::new(),
248 exec_manager,
249 shutdown_deadline: None,
250 #[cfg(feature = "python")]
251 python_actors: Vec::new(),
252 })
253 }
254
255 #[must_use]
257 pub fn handle(&self) -> LiveNodeHandle {
258 self.handle.clone()
259 }
260
261 pub async fn start(&mut self) -> anyhow::Result<()> {
267 if self.state().is_running() {
268 anyhow::bail!("Already running");
269 }
270
271 self.handle.set_state(NodeState::Starting);
272
273 self.kernel.start_async().await;
274 self.kernel.connect_clients().await;
275
276 if !self.await_engines_connected().await {
277 log::error!("Cannot start trader: engine client(s) not connected");
278 self.handle.set_state(NodeState::Running);
279 return Ok(());
280 }
281
282 if let Some(runner) = self.runner.as_mut() {
284 runner.drain_pending_data_events();
285 }
286
287 self.perform_startup_reconciliation().await?;
288
289 self.kernel.start_trader();
290
291 self.handle.set_state(NodeState::Running);
292
293 Ok(())
294 }
295
296 pub async fn stop(&mut self) -> anyhow::Result<()> {
305 if !self.state().is_running() {
306 anyhow::bail!("Not running");
307 }
308
309 self.handle.set_state(NodeState::ShuttingDown);
310
311 self.kernel.stop_trader();
312 let delay = self.kernel.delay_post_stop();
313 log::info!("Awaiting residual events ({delay:?})...");
314
315 tokio::time::sleep(delay).await;
316 self.finalize_stop().await
317 }
318
319 async fn await_engines_connected(&self) -> bool {
323 log::info!(
324 "Awaiting engine connections ({:?} timeout)...",
325 self.config.timeout_connection
326 );
327
328 let start = Instant::now();
329 let timeout = self.config.timeout_connection;
330 let interval = Duration::from_millis(100);
331
332 while start.elapsed() < timeout {
333 if self.kernel.check_engines_connected() {
334 log::info!("All engine clients connected");
335 return true;
336 }
337 tokio::time::sleep(interval).await;
338 }
339
340 self.log_connection_status();
341 false
342 }
343
344 async fn await_engines_disconnected(&self) {
348 log::info!(
349 "Awaiting engine disconnections ({:?} timeout)...",
350 self.config.timeout_disconnection
351 );
352
353 let start = Instant::now();
354 let timeout = self.config.timeout_disconnection;
355 let interval = Duration::from_millis(100);
356
357 while start.elapsed() < timeout {
358 if self.kernel.check_engines_disconnected() {
359 log::info!("All engine clients disconnected");
360 return;
361 }
362 tokio::time::sleep(interval).await;
363 }
364
365 log::error!(
366 "Timed out ({:?}) waiting for engines to disconnect\n\
367 DataEngine.check_disconnected() == {}\n\
368 ExecEngine.check_disconnected() == {}",
369 timeout,
370 self.kernel.data_engine().check_disconnected(),
371 self.kernel.exec_engine().borrow().check_disconnected(),
372 );
373 }
374
375 fn log_connection_status(&self) {
376 #[derive(Tabled)]
377 struct ClientStatus {
378 #[tabled(rename = "Client")]
379 client: String,
380 #[tabled(rename = "Type")]
381 client_type: &'static str,
382 #[tabled(rename = "Connected")]
383 connected: bool,
384 }
385
386 let data_status = self.kernel.data_client_connection_status();
387 let exec_status = self.kernel.exec_client_connection_status();
388
389 let mut rows: Vec<ClientStatus> = Vec::new();
390
391 for (client_id, connected) in data_status {
392 rows.push(ClientStatus {
393 client: client_id.to_string(),
394 client_type: "Data",
395 connected,
396 });
397 }
398
399 for (client_id, connected) in exec_status {
400 rows.push(ClientStatus {
401 client: client_id.to_string(),
402 client_type: "Execution",
403 connected,
404 });
405 }
406
407 let table = Table::new(&rows).with(Style::rounded()).to_string();
408
409 log::warn!(
410 "Timed out ({:?}) waiting for engines to connect\n\n{table}\n\n\
411 DataEngine.check_connected() == {}\n\
412 ExecEngine.check_connected() == {}",
413 self.config.timeout_connection,
414 self.kernel.data_engine().check_connected(),
415 self.kernel.exec_engine().borrow().check_connected(),
416 );
417 }
418
419 #[allow(clippy::await_holding_refcell_ref)] async fn perform_startup_reconciliation(&mut self) -> anyhow::Result<()> {
429 if !self.config.exec_engine.reconciliation {
430 log::info!("Startup reconciliation disabled");
431 return Ok(());
432 }
433
434 log_info!(
435 "Starting execution state reconciliation...",
436 color = LogColor::Blue
437 );
438
439 let lookback_mins = self
440 .config
441 .exec_engine
442 .reconciliation_lookback_mins
443 .map(|m| m as u64);
444
445 let timeout = self.config.timeout_reconciliation;
446 let start = Instant::now();
447 let client_ids = self.kernel.exec_engine.borrow().client_ids();
448
449 for client_id in client_ids {
450 if start.elapsed() > timeout {
451 log::warn!("Reconciliation timeout reached, stopping early");
452 break;
453 }
454
455 log_info!(
456 "Requesting mass status from {}...",
457 client_id,
458 color = LogColor::Blue
459 );
460
461 let mass_status_result = self
462 .kernel
463 .exec_engine
464 .borrow_mut()
465 .generate_mass_status(&client_id, lookback_mins)
466 .await;
467
468 match mass_status_result {
469 Ok(Some(mass_status)) => {
470 log_info!(
471 "Reconciling ExecutionMassStatus for {}",
472 client_id,
473 color = LogColor::Blue
474 );
475
476 let exec_engine_rc = self.kernel.exec_engine.clone();
478
479 let result = self
480 .exec_manager
481 .reconcile_execution_mass_status(mass_status, exec_engine_rc)
482 .await;
483
484 if result.events.is_empty() {
485 log_info!(
486 "Reconciliation for {} succeeded",
487 client_id,
488 color = LogColor::Blue
489 );
490 } else {
491 log::info!(
492 color = LogColor::Blue as u8;
493 "Reconciliation for {} processed {} events",
494 client_id,
495 result.events.len()
496 );
497 }
498
499 if !result.external_orders.is_empty() {
501 let exec_engine = self.kernel.exec_engine.borrow();
502 for external in result.external_orders {
503 exec_engine.register_external_order(
504 external.client_order_id,
505 external.venue_order_id,
506 external.instrument_id,
507 external.strategy_id,
508 external.ts_init,
509 );
510 }
511 }
512 }
513 Ok(None) => {
514 log::warn!(
515 "No mass status available from {client_id} \
516 (likely adapter error when generating reports)"
517 );
518 }
519 Err(e) => {
520 log::warn!("Failed to get mass status from {client_id}: {e}");
521 }
522 }
523 }
524
525 self.kernel.portfolio.borrow_mut().initialize_orders();
526 self.kernel.portfolio.borrow_mut().initialize_positions();
527
528 let elapsed_secs = start.elapsed().as_secs_f64();
529 log_info!(
530 "Startup reconciliation completed in {:.2}s",
531 elapsed_secs,
532 color = LogColor::Blue
533 );
534
535 Ok(())
536 }
537
538 pub async fn run(&mut self) -> anyhow::Result<()> {
560 if self.state().is_running() {
561 anyhow::bail!("Already running");
562 }
563
564 let Some(runner) = self.runner.take() else {
565 anyhow::bail!("Runner already consumed - run() called twice");
566 };
567
568 let AsyncRunnerChannels {
569 mut time_evt_rx,
570 mut data_evt_rx,
571 mut data_cmd_rx,
572 mut exec_evt_rx,
573 mut exec_cmd_rx,
574 } = runner.take_channels();
575
576 log::info!("Event loop starting");
577
578 self.handle.set_state(NodeState::Starting);
579 self.kernel.start_async().await;
580
581 let stop_handle = self.handle.clone();
582 let mut pending = PendingEvents::default();
583
584 let engines_connected = {
589 let startup_future = self.complete_startup();
590 tokio::pin!(startup_future);
591
592 loop {
593 tokio::select! {
594 biased;
595
596 result = &mut startup_future => {
597 break result?;
598 }
599 Some(handler) = time_evt_rx.recv() => {
600 AsyncRunner::handle_time_event(handler);
601 }
602 Some(evt) = data_evt_rx.recv() => {
603 pending.data_evts.push(evt);
604 }
605 Some(cmd) = data_cmd_rx.recv() => {
606 pending.data_cmds.push(cmd);
607 }
608 Some(evt) = exec_evt_rx.recv() => {
609 match evt {
611 ExecutionEvent::Account(_) | ExecutionEvent::Report(_) => {
612 AsyncRunner::handle_exec_event(evt);
613 }
614 ExecutionEvent::Order(order_evt) => {
615 pending.order_evts.push(order_evt);
616 }
617 }
618 }
619 Some(cmd) = exec_cmd_rx.recv() => {
620 pending.exec_cmds.push(cmd);
621 }
622 }
623 }
624 };
625
626 pending.drain();
627
628 if engines_connected {
629 self.perform_startup_reconciliation().await?;
631 self.kernel.start_trader();
632 } else {
633 log::error!("Not starting trader: engine client(s) not connected");
634 }
635
636 self.handle.set_state(NodeState::Running);
637
638 let mut residual_events = 0usize;
640
641 loop {
642 let shutdown_deadline = self.shutdown_deadline;
643 let is_shutting_down = self.state() == NodeState::ShuttingDown;
644
645 tokio::select! {
646 Some(handler) = time_evt_rx.recv() => {
647 AsyncRunner::handle_time_event(handler);
648 if is_shutting_down {
649 log::debug!("Residual time event");
650 residual_events += 1;
651 }
652 }
653 Some(evt) = data_evt_rx.recv() => {
654 if is_shutting_down {
655 log::debug!("Residual data event: {evt:?}");
656 residual_events += 1;
657 }
658 AsyncRunner::handle_data_event(evt);
659 }
660 Some(cmd) = data_cmd_rx.recv() => {
661 if is_shutting_down {
662 log::debug!("Residual data command: {cmd:?}");
663 residual_events += 1;
664 }
665 AsyncRunner::handle_data_command(cmd);
666 }
667 Some(evt) = exec_evt_rx.recv() => {
668 if is_shutting_down {
669 log::debug!("Residual exec event: {evt:?}");
670 residual_events += 1;
671 }
672 AsyncRunner::handle_exec_event(evt);
673 }
674 Some(cmd) = exec_cmd_rx.recv() => {
675 if is_shutting_down {
676 log::debug!("Residual exec command: {cmd:?}");
677 residual_events += 1;
678 }
679 AsyncRunner::handle_exec_command(cmd);
680 }
681 result = tokio::signal::ctrl_c(), if self.state() == NodeState::Running => {
682 match result {
683 Ok(()) => log::info!("Received SIGINT, shutting down"),
684 Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
685 }
686 self.initiate_shutdown();
687 }
688 () = async {
689 loop {
690 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
691 if stop_handle.should_stop() {
692 log::info!("Received stop signal from handle");
693 return;
694 }
695 }
696 }, if self.state() == NodeState::Running => {
697 self.initiate_shutdown();
698 }
699 () = async {
700 match shutdown_deadline {
701 Some(deadline) => tokio::time::sleep_until(deadline).await,
702 None => std::future::pending::<()>().await,
703 }
704 }, if self.state() == NodeState::ShuttingDown => {
705 break;
706 }
707 }
708 }
709
710 if residual_events > 0 {
711 log::debug!("Processed {residual_events} residual events during shutdown");
712 }
713
714 let _ = self.kernel.cache().borrow().check_residuals();
715
716 self.finalize_stop().await?;
717
718 self.drain_channels(
720 &mut time_evt_rx,
721 &mut data_evt_rx,
722 &mut data_cmd_rx,
723 &mut exec_evt_rx,
724 &mut exec_cmd_rx,
725 );
726
727 log::info!("Event loop stopped");
728
729 Ok(())
730 }
731
732 async fn complete_startup(&mut self) -> anyhow::Result<bool> {
735 self.kernel.connect_clients().await;
736
737 if !self.await_engines_connected().await {
738 return Ok(false);
739 }
740
741 Ok(true)
742 }
743
744 fn initiate_shutdown(&mut self) {
745 self.kernel.stop_trader();
746 let delay = self.kernel.delay_post_stop();
747 log::info!("Awaiting residual events ({delay:?})...");
748
749 self.shutdown_deadline = Some(tokio::time::Instant::now() + delay);
750 self.handle.set_state(NodeState::ShuttingDown);
751 }
752
753 async fn finalize_stop(&mut self) -> anyhow::Result<()> {
754 self.kernel.disconnect_clients().await?;
755 self.await_engines_disconnected().await;
756 self.kernel.finalize_stop().await;
757
758 self.handle.set_state(NodeState::Stopped);
759
760 Ok(())
761 }
762
763 fn drain_channels(
764 &self,
765 time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
766 data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
767 data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
768 exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
769 exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
770 ) {
771 let mut drained = 0;
772
773 while let Ok(handler) = time_evt_rx.try_recv() {
774 AsyncRunner::handle_time_event(handler);
775 drained += 1;
776 }
777 while let Ok(cmd) = data_cmd_rx.try_recv() {
778 AsyncRunner::handle_data_command(cmd);
779 drained += 1;
780 }
781 while let Ok(evt) = data_evt_rx.try_recv() {
782 AsyncRunner::handle_data_event(evt);
783 drained += 1;
784 }
785 while let Ok(cmd) = exec_cmd_rx.try_recv() {
786 AsyncRunner::handle_exec_command(cmd);
787 drained += 1;
788 }
789 while let Ok(evt) = exec_evt_rx.try_recv() {
790 AsyncRunner::handle_exec_event(evt);
791 drained += 1;
792 }
793
794 if drained > 0 {
795 log::info!("Drained {drained} remaining events during shutdown");
796 }
797 }
798
799 #[must_use]
801 pub fn environment(&self) -> Environment {
802 self.kernel.environment()
803 }
804
805 #[must_use]
807 pub const fn kernel(&self) -> &NautilusKernel {
808 &self.kernel
809 }
810
811 #[must_use]
813 pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
814 &mut self.kernel
815 }
816
817 #[must_use]
819 pub fn trader_id(&self) -> TraderId {
820 self.kernel.trader_id()
821 }
822
823 #[must_use]
825 pub const fn instance_id(&self) -> UUID4 {
826 self.kernel.instance_id()
827 }
828
829 #[must_use]
831 pub fn state(&self) -> NodeState {
832 self.handle.state()
833 }
834
835 #[must_use]
837 pub fn is_running(&self) -> bool {
838 self.state().is_running()
839 }
840
841 pub fn set_cache_database(
851 &mut self,
852 database: Box<dyn CacheDatabaseAdapter>,
853 ) -> anyhow::Result<()> {
854 if self.state() != NodeState::Idle {
855 anyhow::bail!(
856 "Cannot set cache database while node is running, set it before calling start()"
857 );
858 }
859
860 self.kernel.cache().borrow_mut().set_database(database);
861 Ok(())
862 }
863
864 #[must_use]
866 pub const fn exec_manager(&self) -> &ExecutionManager {
867 &self.exec_manager
868 }
869
870 #[must_use]
872 pub fn exec_manager_mut(&mut self) -> &mut ExecutionManager {
873 &mut self.exec_manager
874 }
875
876 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
889 where
890 T: DataActor + Component + Actor + 'static,
891 {
892 if self.state() != NodeState::Idle {
893 anyhow::bail!(
894 "Cannot add actor while node is running, add actors before calling start()"
895 );
896 }
897
898 self.kernel.trader.add_actor(actor)
899 }
900
901 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
913 where
914 F: FnOnce() -> anyhow::Result<T>,
915 T: DataActor + Component + Actor + 'static,
916 {
917 if self.state() != NodeState::Idle {
918 anyhow::bail!(
919 "Cannot add actor while node is running, add actors before calling start()"
920 );
921 }
922
923 self.kernel.trader.add_actor_from_factory(factory)
924 }
925
926 pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
937 where
938 T: Strategy + Component + Debug + 'static,
939 {
940 if self.state() != NodeState::Idle {
941 anyhow::bail!(
942 "Cannot add strategy while node is running, add strategies before calling start()"
943 );
944 }
945
946 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
948 if let Some(claims) = strategy.external_order_claims() {
949 for instrument_id in claims {
950 self.exec_manager
951 .claim_external_orders(instrument_id, strategy_id);
952 }
953 log_info!(
954 "Registered external order claims for {}: {:?}",
955 strategy_id,
956 strategy.external_order_claims(),
957 color = LogColor::Blue
958 );
959 }
960
961 self.kernel.trader.add_strategy(strategy)
962 }
963}
964
965#[derive(Default)]
971struct PendingEvents {
972 data_cmds: Vec<DataCommand>,
973 data_evts: Vec<DataEvent>,
974 exec_cmds: Vec<TradingCommand>,
975 order_evts: Vec<OrderEventAny>,
976}
977
978impl PendingEvents {
979 fn drain(&mut self) {
980 let total = self.data_evts.len()
981 + self.data_cmds.len()
982 + self.exec_cmds.len()
983 + self.order_evts.len();
984
985 if total > 0 {
986 log::debug!(
987 "Processing {total} events/commands queued during startup \
988 (data_evts={}, data_cmds={}, exec_cmds={}, order_evts={})",
989 self.data_evts.len(),
990 self.data_cmds.len(),
991 self.exec_cmds.len(),
992 self.order_evts.len()
993 );
994 }
995
996 for evt in self.data_evts.drain(..) {
997 AsyncRunner::handle_data_event(evt);
998 }
999 for cmd in self.data_cmds.drain(..) {
1000 AsyncRunner::handle_data_command(cmd);
1001 }
1002 for cmd in self.exec_cmds.drain(..) {
1003 AsyncRunner::handle_exec_command(cmd);
1004 }
1005 for evt in self.order_evts.drain(..) {
1006 AsyncRunner::handle_exec_event(ExecutionEvent::Order(evt));
1007 }
1008 }
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013 use nautilus_model::identifiers::TraderId;
1014 use rstest::*;
1015
1016 use super::*;
1017
1018 #[rstest]
1019 #[case(0, NodeState::Idle)]
1020 #[case(1, NodeState::Starting)]
1021 #[case(2, NodeState::Running)]
1022 #[case(3, NodeState::ShuttingDown)]
1023 #[case(4, NodeState::Stopped)]
1024 fn test_node_state_from_u8_valid(#[case] value: u8, #[case] expected: NodeState) {
1025 assert_eq!(NodeState::from_u8(value), expected);
1026 }
1027
1028 #[rstest]
1029 #[case(5)]
1030 #[case(255)]
1031 #[should_panic(expected = "Invalid NodeState value")]
1032 fn test_node_state_from_u8_invalid_panics(#[case] value: u8) {
1033 let _ = NodeState::from_u8(value);
1034 }
1035
1036 #[rstest]
1037 fn test_node_state_roundtrip() {
1038 for state in [
1039 NodeState::Idle,
1040 NodeState::Starting,
1041 NodeState::Running,
1042 NodeState::ShuttingDown,
1043 NodeState::Stopped,
1044 ] {
1045 assert_eq!(NodeState::from_u8(state.as_u8()), state);
1046 }
1047 }
1048
1049 #[rstest]
1050 fn test_node_state_is_running_only_for_running() {
1051 assert!(!NodeState::Idle.is_running());
1052 assert!(!NodeState::Starting.is_running());
1053 assert!(NodeState::Running.is_running());
1054 assert!(!NodeState::ShuttingDown.is_running());
1055 assert!(!NodeState::Stopped.is_running());
1056 }
1057
1058 #[rstest]
1059 fn test_handle_initial_state() {
1060 let handle = LiveNodeHandle::new();
1061
1062 assert_eq!(handle.state(), NodeState::Idle);
1063 assert!(!handle.should_stop());
1064 assert!(!handle.is_running());
1065 }
1066
1067 #[rstest]
1068 fn test_handle_stop_sets_flag() {
1069 let handle = LiveNodeHandle::new();
1070
1071 handle.stop();
1072
1073 assert!(handle.should_stop());
1074 }
1075
1076 #[rstest]
1077 fn test_handle_set_state_running_clears_stop_flag() {
1078 let handle = LiveNodeHandle::new();
1079 handle.stop();
1080 assert!(handle.should_stop());
1081
1082 handle.set_state(NodeState::Running);
1083
1084 assert!(!handle.should_stop());
1085 assert!(handle.is_running());
1086 assert_eq!(handle.state(), NodeState::Running);
1087 }
1088
1089 #[rstest]
1090 fn test_handle_node_state_transitions() {
1091 let handle = LiveNodeHandle::new();
1092 assert_eq!(handle.state(), NodeState::Idle);
1093
1094 handle.set_state(NodeState::Starting);
1095 assert_eq!(handle.state(), NodeState::Starting);
1096 assert!(!handle.is_running());
1097
1098 handle.set_state(NodeState::Running);
1099 assert_eq!(handle.state(), NodeState::Running);
1100 assert!(handle.is_running());
1101
1102 handle.set_state(NodeState::ShuttingDown);
1103 assert_eq!(handle.state(), NodeState::ShuttingDown);
1104 assert!(!handle.is_running());
1105
1106 handle.set_state(NodeState::Stopped);
1107 assert_eq!(handle.state(), NodeState::Stopped);
1108 assert!(!handle.is_running());
1109 }
1110
1111 #[rstest]
1112 fn test_handle_clone_shares_state_bidirectionally() {
1113 let handle1 = LiveNodeHandle::new();
1114 let handle2 = handle1.clone();
1115
1116 handle1.stop();
1118 assert!(handle2.should_stop());
1119
1120 handle2.set_state(NodeState::Running);
1122 assert_eq!(handle1.state(), NodeState::Running);
1123 }
1124
1125 #[rstest]
1126 fn test_handle_stop_flag_independent_of_state() {
1127 let handle = LiveNodeHandle::new();
1128
1129 handle.set_state(NodeState::Starting);
1131 handle.stop();
1132 assert!(handle.should_stop());
1133 assert_eq!(handle.state(), NodeState::Starting);
1134
1135 handle.set_state(NodeState::ShuttingDown);
1137 assert!(handle.should_stop()); handle.set_state(NodeState::Running);
1140 assert!(!handle.should_stop()); }
1142
1143 #[rstest]
1144 fn test_builder_creation() {
1145 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1146
1147 assert!(result.is_ok());
1148 }
1149
1150 #[rstest]
1151 fn test_builder_rejects_backtest() {
1152 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
1153
1154 assert!(result.is_err());
1155 assert!(result.unwrap_err().to_string().contains("Backtest"));
1156 }
1157
1158 #[rstest]
1159 fn test_builder_accepts_live_environment() {
1160 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
1161
1162 assert!(result.is_ok());
1163 }
1164
1165 #[rstest]
1166 fn test_builder_accepts_sandbox_environment() {
1167 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1168
1169 assert!(result.is_ok());
1170 }
1171
1172 #[rstest]
1173 fn test_builder_fluent_api_chaining() {
1174 let builder = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live)
1175 .unwrap()
1176 .with_name("TestNode")
1177 .with_instance_id(UUID4::new())
1178 .with_load_state(false)
1179 .with_save_state(true)
1180 .with_timeout_connection(30)
1181 .with_timeout_reconciliation(60)
1182 .with_reconciliation(true)
1183 .with_reconciliation_lookback_mins(120)
1184 .with_timeout_portfolio(10)
1185 .with_timeout_disconnection_secs(5)
1186 .with_delay_post_stop_secs(3)
1187 .with_delay_shutdown_secs(10);
1188
1189 assert_eq!(builder.name(), "TestNode");
1190 }
1191
1192 #[cfg(feature = "python")]
1193 #[rstest]
1194 fn test_node_build_and_initial_state() {
1195 let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1196 .unwrap()
1197 .with_name("TestNode")
1198 .build()
1199 .unwrap();
1200
1201 assert_eq!(node.state(), NodeState::Idle);
1202 assert!(!node.is_running());
1203 assert_eq!(node.environment(), Environment::Sandbox);
1204 assert_eq!(node.trader_id(), TraderId::from("TRADER-001"));
1205 }
1206
1207 #[cfg(feature = "python")]
1208 #[rstest]
1209 fn test_node_handle_reflects_node_state() {
1210 let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1211 .unwrap()
1212 .with_name("TestNode")
1213 .build()
1214 .unwrap();
1215
1216 let handle = node.handle();
1217
1218 assert_eq!(handle.state(), NodeState::Idle);
1219 assert!(!handle.is_running());
1220 }
1221}