1use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
23
24use nautilus_common::{
25 actor::{DataActor, registry::try_get_actor_unchecked},
26 cache::Cache,
27 clock::{Clock, TestClock},
28 component::{
29 Component, dispose_component, register_component_actor, reset_component, start_component,
30 stop_component,
31 },
32 enums::{ComponentState, ComponentTrigger, Environment},
33 msgbus,
34 msgbus::{
35 handler::{ShareableMessageHandler, TypedMessageHandler},
36 switchboard::{get_event_orders_topic, get_event_positions_topic},
37 },
38 timer::{TimeEvent, TimeEventCallback},
39};
40use nautilus_core::{UUID4, UnixNanos};
41use nautilus_model::{
42 events::{OrderEventAny, PositionEvent},
43 identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
44};
45use nautilus_portfolio::portfolio::Portfolio;
46use nautilus_trading::strategy::Strategy;
47
48pub struct Trader {
54 pub trader_id: TraderId,
56 pub instance_id: UUID4,
58 pub environment: Environment,
60 state: ComponentState,
62 clock: Rc<RefCell<dyn Clock>>,
64 cache: Rc<RefCell<Cache>>,
66 portfolio: Rc<RefCell<Portfolio>>,
68 actor_ids: Vec<ActorId>,
70 strategy_ids: Vec<StrategyId>,
72 exec_algorithm_ids: Vec<ExecAlgorithmId>,
74 clocks: HashMap<ComponentId, Rc<RefCell<dyn Clock>>>, ts_created: UnixNanos,
78 ts_started: Option<UnixNanos>,
80 ts_stopped: Option<UnixNanos>,
82}
83
84impl Debug for Trader {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 write!(f, "{:?}", stringify!(TraderId)) }
88}
89
90impl Trader {
91 #[must_use]
93 pub fn new(
94 trader_id: TraderId,
95 instance_id: UUID4,
96 environment: Environment,
97 clock: Rc<RefCell<dyn Clock>>,
98 cache: Rc<RefCell<Cache>>,
99 portfolio: Rc<RefCell<Portfolio>>,
100 ) -> Self {
101 let ts_created = clock.borrow().timestamp_ns();
102
103 Self {
104 trader_id,
105 instance_id,
106 environment,
107 state: ComponentState::PreInitialized,
108 clock,
109 cache,
110 portfolio,
111 actor_ids: Vec::new(),
112 strategy_ids: Vec::new(),
113 exec_algorithm_ids: Vec::new(),
114 clocks: HashMap::new(),
115 ts_created,
116 ts_started: None,
117 ts_stopped: None,
118 }
119 }
120
121 #[must_use]
123 pub const fn trader_id(&self) -> TraderId {
124 self.trader_id
125 }
126
127 #[must_use]
129 pub const fn instance_id(&self) -> UUID4 {
130 self.instance_id
131 }
132
133 #[must_use]
135 pub const fn environment(&self) -> Environment {
136 self.environment
137 }
138
139 #[must_use]
141 pub const fn state(&self) -> ComponentState {
142 self.state
143 }
144
145 #[must_use]
147 pub const fn ts_created(&self) -> UnixNanos {
148 self.ts_created
149 }
150
151 #[must_use]
153 pub const fn ts_started(&self) -> Option<UnixNanos> {
154 self.ts_started
155 }
156
157 #[must_use]
159 pub const fn ts_stopped(&self) -> Option<UnixNanos> {
160 self.ts_stopped
161 }
162
163 #[must_use]
165 pub const fn actor_count(&self) -> usize {
166 self.actor_ids.len()
167 }
168
169 #[must_use]
171 pub const fn strategy_count(&self) -> usize {
172 self.strategy_ids.len()
173 }
174
175 #[must_use]
177 pub const fn exec_algorithm_count(&self) -> usize {
178 self.exec_algorithm_ids.len()
179 }
180
181 #[must_use]
183 pub const fn component_count(&self) -> usize {
184 self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
185 }
186
187 #[must_use]
189 pub fn actor_ids(&self) -> Vec<ActorId> {
190 self.actor_ids.clone()
191 }
192
193 #[must_use]
195 pub fn strategy_ids(&self) -> Vec<StrategyId> {
196 self.strategy_ids.clone()
197 }
198
199 #[must_use]
201 pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
202 self.exec_algorithm_ids.clone()
203 }
204
205 fn create_component_clock(&self) -> Rc<RefCell<dyn Clock>> {
210 match self.environment {
211 Environment::Backtest => {
212 Rc::new(RefCell::new(TestClock::new()))
214 }
215 Environment::Live | Environment::Sandbox => {
216 self.clock.clone()
218 }
219 }
220 }
221
222 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
230 where
231 T: DataActor + Component + Debug + 'static,
232 {
233 self.validate_component_registration()?;
234
235 let actor_id = actor.actor_id();
236
237 if self.actor_ids.contains(&actor_id) {
239 anyhow::bail!("Actor {actor_id} is already registered");
240 }
241
242 let clock = self.create_component_clock();
243 let component_id = ComponentId::new(actor_id.inner().as_str());
244 self.clocks.insert(component_id, clock.clone());
245
246 let mut actor_mut = actor;
247 actor_mut.register(self.trader_id, clock, self.cache.clone())?;
248
249 self.add_registered_actor(actor_mut)
250 }
251
252 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
264 where
265 F: FnOnce() -> anyhow::Result<T>,
266 T: DataActor + Component + Debug + 'static,
267 {
268 let actor = factory()?;
269
270 self.add_actor(actor)
271 }
272
273 pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
279 where
280 T: DataActor + Component + Debug + 'static,
281 {
282 let actor_id = actor.actor_id();
283
284 register_component_actor(actor);
286
287 self.actor_ids.push(actor_id);
289
290 log::info!("Registered actor {actor_id} with trader {}", self.trader_id);
291
292 Ok(())
293 }
294
295 pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
305 if self.actor_ids.contains(&actor_id) {
307 anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
308 }
309
310 self.actor_ids.push(actor_id);
312
313 log::debug!(
314 "Added actor ID '{actor_id}' to trader {} for lifecycle management",
315 self.trader_id
316 );
317
318 Ok(())
319 }
320
321 pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
333 where
334 T: Strategy + Component + Debug + 'static,
335 {
336 self.validate_component_registration()?;
337
338 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
339
340 if self.strategy_ids.contains(&strategy_id) {
342 anyhow::bail!("Strategy {strategy_id} is already registered");
343 }
344
345 let clock = self.create_component_clock();
346 let component_id = strategy.component_id();
347 self.clocks.insert(component_id, clock.clone());
348
349 strategy.core_mut().register(
351 self.trader_id,
352 clock.clone(),
353 self.cache.clone(),
354 self.portfolio.clone(),
355 )?;
356
357 let actor_id = strategy.actor_id().inner();
359 let callback = TimeEventCallback::from(move |event: TimeEvent| {
360 if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
361 actor.handle_time_event(&event);
362 } else {
363 log::error!("Strategy {actor_id} not found for time event handling");
364 }
365 });
366 clock.borrow_mut().register_default_handler(callback);
367
368 strategy.initialize()?;
370
371 register_component_actor(strategy);
373
374 let order_topic = get_event_orders_topic(strategy_id);
375 let order_actor_id = actor_id;
376 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
377 move |event: &OrderEventAny| {
378 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
379 strategy.handle_order_event(event.clone());
380 } else {
381 log::error!("Strategy {order_actor_id} not found for order event handling");
382 }
383 },
384 )));
385 msgbus::subscribe_topic(order_topic, handler, None);
386
387 let position_topic = get_event_positions_topic(strategy_id);
388 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
389 move |event: &PositionEvent| {
390 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
391 strategy.handle_position_event(event.clone());
392 } else {
393 log::error!("Strategy {actor_id} not found for position event handling");
394 }
395 },
396 )));
397 msgbus::subscribe_topic(position_topic, handler, None);
398
399 self.strategy_ids.push(strategy_id);
400
401 log::info!(
402 "Registered strategy {strategy_id} with trader {}",
403 self.trader_id
404 );
405
406 Ok(())
407 }
408
409 pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
420 where
421 T: DataActor + Component + Debug + 'static,
422 {
423 self.validate_component_registration()?;
424
425 let exec_algorithm_id =
426 ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
427
428 if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
430 anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
431 }
432
433 let clock = self.create_component_clock();
434 let component_id = exec_algorithm.component_id();
435 self.clocks.insert(component_id, clock.clone());
436
437 exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
438
439 register_component_actor(exec_algorithm);
441
442 self.exec_algorithm_ids.push(exec_algorithm_id);
443
444 log::info!(
445 "Registered execution algorithm {exec_algorithm_id} with trader {}",
446 self.trader_id
447 );
448
449 Ok(())
450 }
451
452 fn validate_component_registration(&self) -> anyhow::Result<()> {
454 match self.state {
455 ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
456 Ok(())
457 }
458 ComponentState::Running => {
459 anyhow::bail!("Cannot add components while trader is running")
460 }
461 ComponentState::Disposed => {
462 anyhow::bail!("Cannot add components to disposed trader")
463 }
464 _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
465 }
466 }
467
468 pub fn start_components(&mut self) -> anyhow::Result<()> {
474 for actor_id in &self.actor_ids {
475 log::debug!("Starting actor {actor_id}");
476 start_component(&actor_id.inner())?;
477 }
478
479 for strategy_id in &self.strategy_ids {
480 log::debug!("Starting strategy {strategy_id}");
481 start_component(&strategy_id.inner())?;
482 }
483
484 for exec_algorithm_id in &self.exec_algorithm_ids {
485 log::debug!("Starting execution algorithm {exec_algorithm_id}");
486 start_component(&exec_algorithm_id.inner())?;
487 }
488
489 Ok(())
490 }
491
492 pub fn stop_components(&mut self) -> anyhow::Result<()> {
498 for actor_id in &self.actor_ids {
499 log::debug!("Stopping actor {actor_id}");
500 stop_component(&actor_id.inner())?;
501 }
502
503 for exec_algorithm_id in &self.exec_algorithm_ids {
504 log::debug!("Stopping execution algorithm {exec_algorithm_id}");
505 stop_component(&exec_algorithm_id.inner())?;
506 }
507
508 for strategy_id in &self.strategy_ids {
509 log::debug!("Stopping strategy {strategy_id}");
510 stop_component(&strategy_id.inner())?;
511 }
512
513 Ok(())
514 }
515
516 pub fn reset_components(&mut self) -> anyhow::Result<()> {
522 for actor_id in &self.actor_ids {
523 log::debug!("Resetting actor {actor_id}");
524 reset_component(&actor_id.inner())?;
525 }
526
527 for strategy_id in &self.strategy_ids {
528 log::debug!("Resetting strategy {strategy_id}");
529 reset_component(&strategy_id.inner())?;
530 }
531
532 for exec_algorithm_id in &self.exec_algorithm_ids {
533 log::debug!("Resetting execution algorithm {exec_algorithm_id}");
534 reset_component(&exec_algorithm_id.inner())?;
535 }
536
537 Ok(())
538 }
539
540 pub fn dispose_components(&mut self) -> anyhow::Result<()> {
546 for actor_id in &self.actor_ids {
547 log::debug!("Disposing actor {actor_id}");
548 dispose_component(&actor_id.inner())?;
549 }
550
551 for strategy_id in &self.strategy_ids {
552 log::debug!("Disposing strategy {strategy_id}");
553 dispose_component(&strategy_id.inner())?;
554 }
555
556 for exec_algorithm_id in &self.exec_algorithm_ids {
557 log::debug!("Disposing execution algorithm {exec_algorithm_id}");
558 dispose_component(&exec_algorithm_id.inner())?;
559 }
560
561 self.actor_ids.clear();
562 self.strategy_ids.clear();
563 self.exec_algorithm_ids.clear();
564 self.clocks.clear();
565
566 Ok(())
567 }
568
569 pub fn initialize(&mut self) -> anyhow::Result<()> {
577 let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
578 self.state = new_state;
579
580 Ok(())
581 }
582
583 fn on_start(&mut self) -> anyhow::Result<()> {
584 self.start_components()?;
585
586 self.ts_started = Some(self.clock.borrow().timestamp_ns());
588
589 Ok(())
590 }
591
592 fn on_stop(&mut self) -> anyhow::Result<()> {
593 self.stop_components()?;
594
595 self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
596
597 Ok(())
598 }
599
600 fn on_reset(&mut self) -> anyhow::Result<()> {
601 self.reset_components()?;
602
603 self.ts_started = None;
604 self.ts_stopped = None;
605
606 Ok(())
607 }
608
609 fn on_dispose(&mut self) -> anyhow::Result<()> {
610 if self.is_running() {
611 self.stop()?;
612 }
613
614 self.dispose_components()?;
615
616 Ok(())
617 }
618}
619
620impl Component for Trader {
621 fn component_id(&self) -> ComponentId {
622 ComponentId::new(format!("Trader-{}", self.trader_id))
623 }
624
625 fn state(&self) -> ComponentState {
626 self.state
627 }
628
629 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
630 self.state = self.state.transition(&trigger)?;
631 log::info!("{}", self.state.variant_name());
632 Ok(())
633 }
634
635 fn register(
636 &mut self,
637 _trader_id: TraderId,
638 _clock: Rc<RefCell<dyn Clock>>,
639 _cache: Rc<RefCell<Cache>>,
640 ) -> anyhow::Result<()> {
641 anyhow::bail!("Trader cannot register with itself")
642 }
643
644 fn on_start(&mut self) -> anyhow::Result<()> {
645 Self::on_start(self)
646 }
647
648 fn on_stop(&mut self) -> anyhow::Result<()> {
649 Self::on_stop(self)
650 }
651
652 fn on_reset(&mut self) -> anyhow::Result<()> {
653 Self::on_reset(self)
654 }
655
656 fn on_dispose(&mut self) -> anyhow::Result<()> {
657 Self::on_dispose(self)
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use std::{
664 cell::RefCell,
665 ops::{Deref, DerefMut},
666 rc::Rc,
667 };
668
669 use nautilus_common::{
670 actor::{DataActorCore, data_actor::DataActorConfig},
671 cache::Cache,
672 clock::TestClock,
673 enums::{ComponentState, Environment},
674 msgbus::MessageBus,
675 };
676 use nautilus_core::UUID4;
677 use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
678 use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
679 use nautilus_model::{
680 identifiers::{ActorId, ComponentId, TraderId},
681 stubs::TestDefault,
682 };
683 use nautilus_portfolio::portfolio::Portfolio;
684 use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
685 use nautilus_trading::strategy::{
686 Strategy as StrategyTrait, config::StrategyConfig, core::StrategyCore,
687 };
688 use rstest::rstest;
689
690 use super::*;
691
692 #[derive(Debug)]
694 struct TestDataActor {
695 core: DataActorCore,
696 }
697
698 impl TestDataActor {
699 fn new(config: DataActorConfig) -> Self {
700 Self {
701 core: DataActorCore::new(config),
702 }
703 }
704 }
705
706 impl DataActor for TestDataActor {}
707
708 impl Deref for TestDataActor {
709 type Target = DataActorCore;
710 fn deref(&self) -> &Self::Target {
711 &self.core
712 }
713 }
714
715 impl DerefMut for TestDataActor {
716 fn deref_mut(&mut self) -> &mut Self::Target {
717 &mut self.core
718 }
719 }
720
721 #[derive(Debug)]
723 struct TestStrategy {
724 core: StrategyCore,
725 }
726
727 impl TestStrategy {
728 fn new(config: StrategyConfig) -> Self {
729 Self {
730 core: StrategyCore::new(config),
731 }
732 }
733 }
734
735 impl DataActor for TestStrategy {}
736
737 impl Deref for TestStrategy {
739 type Target = DataActorCore;
740 fn deref(&self) -> &Self::Target {
741 &self.core
742 }
743 }
744
745 impl DerefMut for TestStrategy {
746 fn deref_mut(&mut self) -> &mut Self::Target {
747 &mut self.core
748 }
749 }
750
751 impl StrategyTrait for TestStrategy {
752 fn core_mut(&mut self) -> &mut StrategyCore {
753 &mut self.core
754 }
755 }
756
757 #[allow(clippy::type_complexity)]
758 fn create_trader_components() -> (
759 Rc<RefCell<MessageBus>>,
760 Rc<RefCell<Cache>>,
761 Rc<RefCell<Portfolio>>,
762 Rc<RefCell<DataEngine>>,
763 Rc<RefCell<RiskEngine>>,
764 Rc<RefCell<ExecutionEngine>>,
765 Rc<RefCell<TestClock>>,
766 ) {
767 let trader_id = TraderId::test_default();
768 let instance_id = UUID4::new();
769 let clock = Rc::new(RefCell::new(TestClock::new()));
770 clock.borrow_mut().set_time(1_000_000_000u64.into());
772 let msgbus = Rc::new(RefCell::new(MessageBus::new(
773 trader_id,
774 instance_id,
775 Some("test".to_string()),
776 None,
777 )));
778 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
779 let portfolio = Rc::new(RefCell::new(Portfolio::new(
780 cache.clone(),
781 clock.clone() as Rc<RefCell<dyn Clock>>,
782 None,
783 )));
784 let data_engine = Rc::new(RefCell::new(DataEngine::new(
785 clock.clone(),
786 cache.clone(),
787 Some(DataEngineConfig::default()),
788 )));
789
790 let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
792 let risk_clock = Rc::new(RefCell::new(TestClock::new()));
793 let risk_portfolio = Portfolio::new(
794 risk_cache.clone(),
795 risk_clock.clone() as Rc<RefCell<dyn Clock>>,
796 None,
797 );
798 let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
799 RiskEngineConfig::default(),
800 risk_portfolio,
801 risk_clock as Rc<RefCell<dyn Clock>>,
802 risk_cache,
803 )));
804 let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
805 clock.clone(),
806 cache.clone(),
807 Some(ExecutionEngineConfig::default()),
808 )));
809
810 (
811 msgbus,
812 cache,
813 portfolio,
814 data_engine,
815 risk_engine,
816 exec_engine,
817 clock,
818 )
819 }
820
821 #[rstest]
822 fn test_trader_creation() {
823 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
824 create_trader_components();
825 let trader_id = TraderId::test_default();
826 let instance_id = UUID4::new();
827
828 let trader = Trader::new(
829 trader_id,
830 instance_id,
831 Environment::Backtest,
832 clock,
833 cache,
834 portfolio,
835 );
836
837 assert_eq!(trader.trader_id(), trader_id);
838 assert_eq!(trader.instance_id(), instance_id);
839 assert_eq!(trader.environment(), Environment::Backtest);
840 assert_eq!(trader.state(), ComponentState::PreInitialized);
841 assert_eq!(trader.actor_count(), 0);
842 assert_eq!(trader.strategy_count(), 0);
843 assert_eq!(trader.exec_algorithm_count(), 0);
844 assert_eq!(trader.component_count(), 0);
845 assert!(!trader.is_running());
846 assert!(!trader.is_stopped());
847 assert!(!trader.is_disposed());
848 assert!(trader.ts_created() > 0);
849 assert!(trader.ts_started().is_none());
850 assert!(trader.ts_stopped().is_none());
851 }
852
853 #[rstest]
854 fn test_trader_component_id() {
855 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
856 create_trader_components();
857 let trader_id = TraderId::from("TRADER-001");
858 let instance_id = UUID4::new();
859
860 let trader = Trader::new(
861 trader_id,
862 instance_id,
863 Environment::Backtest,
864 clock,
865 cache,
866 portfolio,
867 );
868
869 assert_eq!(
870 trader.component_id(),
871 ComponentId::from("Trader-TRADER-001")
872 );
873 }
874
875 #[rstest]
876 fn test_add_actor_success() {
877 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
878 create_trader_components();
879 let trader_id = TraderId::test_default();
880 let instance_id = UUID4::new();
881
882 let mut trader = Trader::new(
883 trader_id,
884 instance_id,
885 Environment::Backtest,
886 clock,
887 cache,
888 portfolio,
889 );
890
891 let actor = TestDataActor::new(DataActorConfig::default());
892 let actor_id = actor.actor_id();
893
894 let result = trader.add_actor(actor);
895 assert!(result.is_ok());
896 assert_eq!(trader.actor_count(), 1);
897 assert_eq!(trader.component_count(), 1);
898 assert!(trader.actor_ids().contains(&actor_id));
899 }
900
901 #[rstest]
902 fn test_add_duplicate_actor_fails() {
903 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
904 create_trader_components();
905 let trader_id = TraderId::test_default();
906 let instance_id = UUID4::new();
907
908 let mut trader = Trader::new(
909 trader_id,
910 instance_id,
911 Environment::Backtest,
912 clock,
913 cache,
914 portfolio,
915 );
916
917 let config = DataActorConfig {
918 actor_id: Some(ActorId::from("TestActor")),
919 ..Default::default()
920 };
921 let actor1 = TestDataActor::new(config.clone());
922 let actor2 = TestDataActor::new(config);
923
924 assert!(trader.add_actor(actor1).is_ok());
926 assert_eq!(trader.actor_count(), 1);
927
928 let result = trader.add_actor(actor2);
930 assert!(result.is_err());
931 assert!(
932 result
933 .unwrap_err()
934 .to_string()
935 .contains("already registered")
936 );
937 assert_eq!(trader.actor_count(), 1);
938 }
939
940 #[rstest]
941 fn test_add_strategy_success() {
942 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
943 create_trader_components();
944 let trader_id = TraderId::test_default();
945 let instance_id = UUID4::new();
946
947 let mut trader = Trader::new(
948 trader_id,
949 instance_id,
950 Environment::Backtest,
951 clock,
952 cache,
953 portfolio,
954 );
955
956 let config = StrategyConfig {
957 strategy_id: Some(StrategyId::from("Test-Strategy")),
958 ..Default::default()
959 };
960 let strategy = TestStrategy::new(config);
961 let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
962
963 let result = trader.add_strategy(strategy);
964 assert!(result.is_ok());
965 assert_eq!(trader.strategy_count(), 1);
966 assert_eq!(trader.component_count(), 1);
967 assert!(trader.strategy_ids().contains(&strategy_id));
968 }
969
970 #[rstest]
971 fn test_add_exec_algorithm_success() {
972 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
973 create_trader_components();
974 let trader_id = TraderId::test_default();
975 let instance_id = UUID4::new();
976
977 let mut trader = Trader::new(
978 trader_id,
979 instance_id,
980 Environment::Backtest,
981 clock,
982 cache,
983 portfolio,
984 );
985
986 let config = DataActorConfig {
987 actor_id: Some(ActorId::from("TestExecAlgorithm")),
988 ..Default::default()
989 };
990 let exec_algorithm = TestDataActor::new(config);
991 let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
992
993 let result = trader.add_exec_algorithm(exec_algorithm);
994 assert!(result.is_ok());
995 assert_eq!(trader.exec_algorithm_count(), 1);
996 assert_eq!(trader.component_count(), 1);
997 assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
998 }
999
1000 #[rstest]
1001 fn test_component_lifecycle() {
1002 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1003 create_trader_components();
1004 let trader_id = TraderId::test_default();
1005 let instance_id = UUID4::new();
1006
1007 let mut trader = Trader::new(
1008 trader_id,
1009 instance_id,
1010 Environment::Backtest,
1011 clock,
1012 cache,
1013 portfolio,
1014 );
1015
1016 let actor = TestDataActor::new(DataActorConfig::default());
1018
1019 let strategy_config = StrategyConfig {
1020 strategy_id: Some(StrategyId::from("Test-Strategy")),
1021 ..Default::default()
1022 };
1023 let strategy = TestStrategy::new(strategy_config);
1024
1025 let exec_algorithm_config = DataActorConfig {
1026 actor_id: Some(ActorId::from("TestExecAlgorithm")),
1027 ..Default::default()
1028 };
1029 let exec_algorithm = TestDataActor::new(exec_algorithm_config);
1030
1031 assert!(trader.add_actor(actor).is_ok());
1032 assert!(trader.add_strategy(strategy).is_ok());
1033 assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1034 assert_eq!(trader.component_count(), 3);
1035
1036 let start_result = trader.start_components();
1038 assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1039
1040 assert!(trader.stop_components().is_ok());
1042
1043 assert!(trader.reset_components().is_ok());
1045
1046 assert!(trader.dispose_components().is_ok());
1048 assert_eq!(trader.component_count(), 0);
1049 }
1050
1051 #[rstest]
1052 fn test_trader_component_lifecycle() {
1053 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1054 create_trader_components();
1055 let trader_id = TraderId::test_default();
1056 let instance_id = UUID4::new();
1057
1058 let mut trader = Trader::new(
1059 trader_id,
1060 instance_id,
1061 Environment::Backtest,
1062 clock,
1063 cache,
1064 portfolio,
1065 );
1066
1067 assert_eq!(trader.state(), ComponentState::PreInitialized);
1069 assert!(!trader.is_running());
1070 assert!(!trader.is_stopped());
1071 assert!(!trader.is_disposed());
1072
1073 assert!(trader.start().is_err());
1075
1076 trader.initialize().unwrap();
1078
1079 assert!(trader.start().is_ok());
1081 assert_eq!(trader.state(), ComponentState::Running);
1082 assert!(trader.is_running());
1083 assert!(trader.ts_started().is_some());
1084
1085 assert!(trader.stop().is_ok());
1087 assert_eq!(trader.state(), ComponentState::Stopped);
1088 assert!(trader.is_stopped());
1089 assert!(trader.ts_stopped().is_some());
1090
1091 assert!(trader.reset().is_ok());
1093 assert_eq!(trader.state(), ComponentState::Ready);
1094 assert!(trader.ts_started().is_none());
1095 assert!(trader.ts_stopped().is_none());
1096
1097 assert!(trader.dispose().is_ok());
1099 assert_eq!(trader.state(), ComponentState::Disposed);
1100 assert!(trader.is_disposed());
1101 }
1102
1103 #[rstest]
1104 fn test_cannot_add_components_while_running() {
1105 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1106 create_trader_components();
1107 let trader_id = TraderId::test_default();
1108 let instance_id = UUID4::new();
1109
1110 let mut trader = Trader::new(
1111 trader_id,
1112 instance_id,
1113 Environment::Backtest,
1114 clock,
1115 cache,
1116 portfolio,
1117 );
1118
1119 trader.state = ComponentState::Running;
1121
1122 let actor = TestDataActor::new(DataActorConfig::default());
1123 let result = trader.add_actor(actor);
1124 assert!(result.is_err());
1125 assert!(
1126 result
1127 .unwrap_err()
1128 .to_string()
1129 .contains("while trader is running")
1130 );
1131 }
1132
1133 #[rstest]
1134 fn test_create_component_clock_backtest_vs_live() {
1135 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1136 create_trader_components();
1137 let trader_id = TraderId::test_default();
1138 let instance_id = UUID4::new();
1139
1140 let trader_backtest = Trader::new(
1142 trader_id,
1143 instance_id,
1144 Environment::Backtest,
1145 clock.clone(),
1146 cache.clone(),
1147 portfolio.clone(),
1148 );
1149
1150 let backtest_clock = trader_backtest.create_component_clock();
1151 assert_ne!(
1153 backtest_clock.as_ptr() as *const _,
1154 clock.as_ptr() as *const _
1155 );
1156
1157 let trader_live = Trader::new(
1159 trader_id,
1160 instance_id,
1161 Environment::Live,
1162 clock.clone(),
1163 cache,
1164 portfolio,
1165 );
1166
1167 let live_clock = trader_live.create_component_clock();
1168 assert_eq!(live_clock.as_ptr() as *const _, clock.as_ptr() as *const _);
1170 }
1171}