1use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
23
24use nautilus_common::{
25 actor::{DataActor, registry::try_get_actor_unchecked},
26 cache::Cache,
27 clock::{Clock, TestClock},
28 component::{
29 Component, dispose_component, register_component_actor, reset_component, start_component,
30 stop_component,
31 },
32 enums::{ComponentState, ComponentTrigger, Environment},
33 msgbus,
34 msgbus::{
35 handler::{ShareableMessageHandler, TypedMessageHandler},
36 switchboard::{get_event_orders_topic, get_event_positions_topic},
37 },
38 timer::{TimeEvent, TimeEventCallback},
39};
40use nautilus_core::{UUID4, UnixNanos};
41use nautilus_model::{
42 events::{OrderEventAny, PositionEvent},
43 identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
44};
45use nautilus_portfolio::portfolio::Portfolio;
46use nautilus_trading::strategy::Strategy;
47
48pub struct Trader {
54 pub trader_id: TraderId,
56 pub instance_id: UUID4,
58 pub environment: Environment,
60 state: ComponentState,
62 clock: Rc<RefCell<dyn Clock>>,
64 cache: Rc<RefCell<Cache>>,
66 portfolio: Rc<RefCell<Portfolio>>,
68 actor_ids: Vec<ActorId>,
70 strategy_ids: Vec<StrategyId>,
72 exec_algorithm_ids: Vec<ExecAlgorithmId>,
74 clocks: HashMap<ComponentId, Rc<RefCell<dyn Clock>>>, ts_created: UnixNanos,
78 ts_started: Option<UnixNanos>,
80 ts_stopped: Option<UnixNanos>,
82}
83
84impl Debug for Trader {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 write!(f, "{:?}", stringify!(TraderId)) }
88}
89
90impl Trader {
91 #[must_use]
93 pub fn new(
94 trader_id: TraderId,
95 instance_id: UUID4,
96 environment: Environment,
97 clock: Rc<RefCell<dyn Clock>>,
98 cache: Rc<RefCell<Cache>>,
99 portfolio: Rc<RefCell<Portfolio>>,
100 ) -> Self {
101 let ts_created = clock.borrow().timestamp_ns();
102
103 Self {
104 trader_id,
105 instance_id,
106 environment,
107 state: ComponentState::PreInitialized,
108 clock,
109 cache,
110 portfolio,
111 actor_ids: Vec::new(),
112 strategy_ids: Vec::new(),
113 exec_algorithm_ids: Vec::new(),
114 clocks: HashMap::new(),
115 ts_created,
116 ts_started: None,
117 ts_stopped: None,
118 }
119 }
120
121 #[must_use]
123 pub const fn trader_id(&self) -> TraderId {
124 self.trader_id
125 }
126
127 #[must_use]
129 pub const fn instance_id(&self) -> UUID4 {
130 self.instance_id
131 }
132
133 #[must_use]
135 pub const fn environment(&self) -> Environment {
136 self.environment
137 }
138
139 #[must_use]
141 pub const fn state(&self) -> ComponentState {
142 self.state
143 }
144
145 #[must_use]
147 pub const fn ts_created(&self) -> UnixNanos {
148 self.ts_created
149 }
150
151 #[must_use]
153 pub const fn ts_started(&self) -> Option<UnixNanos> {
154 self.ts_started
155 }
156
157 #[must_use]
159 pub const fn ts_stopped(&self) -> Option<UnixNanos> {
160 self.ts_stopped
161 }
162
163 #[must_use]
165 pub const fn actor_count(&self) -> usize {
166 self.actor_ids.len()
167 }
168
169 #[must_use]
171 pub const fn strategy_count(&self) -> usize {
172 self.strategy_ids.len()
173 }
174
175 #[must_use]
177 pub const fn exec_algorithm_count(&self) -> usize {
178 self.exec_algorithm_ids.len()
179 }
180
181 #[must_use]
183 pub const fn component_count(&self) -> usize {
184 self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
185 }
186
187 #[must_use]
189 pub fn actor_ids(&self) -> Vec<ActorId> {
190 self.actor_ids.clone()
191 }
192
193 #[must_use]
195 pub fn strategy_ids(&self) -> Vec<StrategyId> {
196 self.strategy_ids.clone()
197 }
198
199 #[must_use]
201 pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
202 self.exec_algorithm_ids.clone()
203 }
204
205 fn create_component_clock(&self) -> Rc<RefCell<dyn Clock>> {
210 match self.environment {
211 Environment::Backtest => {
212 Rc::new(RefCell::new(TestClock::new()))
214 }
215 Environment::Live | Environment::Sandbox => {
216 self.clock.clone()
218 }
219 }
220 }
221
222 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
230 where
231 T: DataActor + Component + Debug + 'static,
232 {
233 self.validate_component_registration()?;
234
235 let actor_id = actor.actor_id();
236
237 if self.actor_ids.contains(&actor_id) {
239 anyhow::bail!("Actor '{actor_id}' is already registered");
240 }
241
242 let clock = self.create_component_clock();
243 let component_id = ComponentId::new(actor_id.inner().as_str());
244 self.clocks.insert(component_id, clock.clone());
245
246 let mut actor_mut = actor;
247 actor_mut.register(self.trader_id, clock, self.cache.clone())?;
248
249 self.add_registered_actor(actor_mut)
250 }
251
252 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
264 where
265 F: FnOnce() -> anyhow::Result<T>,
266 T: DataActor + Component + Debug + 'static,
267 {
268 let actor = factory()?;
269
270 self.add_actor(actor)
271 }
272
273 pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
279 where
280 T: DataActor + Component + Debug + 'static,
281 {
282 let actor_id = actor.actor_id();
283
284 register_component_actor(actor);
286
287 self.actor_ids.push(actor_id);
289
290 log::info!(
291 "Registered actor '{actor_id}' with trader {}",
292 self.trader_id
293 );
294
295 Ok(())
296 }
297
298 pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
308 if self.actor_ids.contains(&actor_id) {
310 anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
311 }
312
313 self.actor_ids.push(actor_id);
315
316 log::debug!(
317 "Added actor ID '{actor_id}' to trader {} for lifecycle management",
318 self.trader_id
319 );
320
321 Ok(())
322 }
323
324 pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
336 where
337 T: Strategy + Component + Debug + 'static,
338 {
339 self.validate_component_registration()?;
340
341 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
342
343 if self.strategy_ids.contains(&strategy_id) {
345 anyhow::bail!("Strategy '{strategy_id}' is already registered");
346 }
347
348 let clock = self.create_component_clock();
349 let component_id = strategy.component_id();
350 self.clocks.insert(component_id, clock.clone());
351
352 strategy.core_mut().register(
354 self.trader_id,
355 clock.clone(),
356 self.cache.clone(),
357 self.portfolio.clone(),
358 )?;
359
360 let actor_id = strategy.actor_id().inner();
362 let callback = TimeEventCallback::from(move |event: TimeEvent| {
363 if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
364 actor.handle_time_event(&event);
365 } else {
366 log::error!("Strategy {actor_id} not found for time event handling");
367 }
368 });
369 clock.borrow_mut().register_default_handler(callback);
370
371 strategy.initialize()?;
373
374 register_component_actor(strategy);
376
377 let order_topic = get_event_orders_topic(strategy_id);
378 let order_actor_id = actor_id;
379 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
380 move |event: &OrderEventAny| {
381 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
382 strategy.handle_order_event(event.clone());
383 } else {
384 log::error!("Strategy {order_actor_id} not found for order event handling");
385 }
386 },
387 )));
388 msgbus::subscribe_topic(order_topic, handler, None);
389
390 let position_topic = get_event_positions_topic(strategy_id);
391 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
392 move |event: &PositionEvent| {
393 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
394 strategy.handle_position_event(event.clone());
395 } else {
396 log::error!("Strategy {actor_id} not found for position event handling");
397 }
398 },
399 )));
400 msgbus::subscribe_topic(position_topic, handler, None);
401
402 self.strategy_ids.push(strategy_id);
403
404 log::info!(
405 "Registered strategy {strategy_id} with trader {}",
406 self.trader_id
407 );
408
409 Ok(())
410 }
411
412 pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
423 where
424 T: DataActor + Component + Debug + 'static,
425 {
426 self.validate_component_registration()?;
427
428 let exec_algorithm_id =
429 ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
430
431 if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
433 anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
434 }
435
436 let clock = self.create_component_clock();
437 let component_id = exec_algorithm.component_id();
438 self.clocks.insert(component_id, clock.clone());
439
440 exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
441
442 register_component_actor(exec_algorithm);
444
445 self.exec_algorithm_ids.push(exec_algorithm_id);
446
447 log::info!(
448 "Registered execution algorithm '{exec_algorithm_id}' with trader {}",
449 self.trader_id
450 );
451
452 Ok(())
453 }
454
455 fn validate_component_registration(&self) -> anyhow::Result<()> {
457 match self.state {
458 ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
459 Ok(())
460 }
461 ComponentState::Running => {
462 anyhow::bail!("Cannot add components while trader is running")
463 }
464 ComponentState::Disposed => {
465 anyhow::bail!("Cannot add components to disposed trader")
466 }
467 _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
468 }
469 }
470
471 pub fn start_components(&mut self) -> anyhow::Result<()> {
477 for actor_id in &self.actor_ids {
478 log::debug!("Starting actor {actor_id}");
479 start_component(&actor_id.inner())?;
480 }
481
482 for strategy_id in &self.strategy_ids {
483 log::debug!("Starting strategy {strategy_id}");
484 start_component(&strategy_id.inner())?;
485 }
486
487 for exec_algorithm_id in &self.exec_algorithm_ids {
488 log::debug!("Starting execution algorithm {exec_algorithm_id}");
489 start_component(&exec_algorithm_id.inner())?;
490 }
491
492 Ok(())
493 }
494
495 pub fn stop_components(&mut self) -> anyhow::Result<()> {
501 for actor_id in &self.actor_ids {
502 log::debug!("Stopping actor {actor_id}");
503 stop_component(&actor_id.inner())?;
504 }
505
506 for exec_algorithm_id in &self.exec_algorithm_ids {
507 log::debug!("Stopping execution algorithm {exec_algorithm_id}");
508 stop_component(&exec_algorithm_id.inner())?;
509 }
510
511 for strategy_id in &self.strategy_ids {
512 log::debug!("Stopping strategy {strategy_id}");
513 stop_component(&strategy_id.inner())?;
514 }
515
516 Ok(())
517 }
518
519 pub fn reset_components(&mut self) -> anyhow::Result<()> {
525 for actor_id in &self.actor_ids {
526 log::debug!("Resetting actor {actor_id}");
527 reset_component(&actor_id.inner())?;
528 }
529
530 for strategy_id in &self.strategy_ids {
531 log::debug!("Resetting strategy {strategy_id}");
532 reset_component(&strategy_id.inner())?;
533 }
534
535 for exec_algorithm_id in &self.exec_algorithm_ids {
536 log::debug!("Resetting execution algorithm {exec_algorithm_id}");
537 reset_component(&exec_algorithm_id.inner())?;
538 }
539
540 Ok(())
541 }
542
543 pub fn dispose_components(&mut self) -> anyhow::Result<()> {
549 for actor_id in &self.actor_ids {
550 log::debug!("Disposing actor {actor_id}");
551 dispose_component(&actor_id.inner())?;
552 }
553
554 for strategy_id in &self.strategy_ids {
555 log::debug!("Disposing strategy {strategy_id}");
556 dispose_component(&strategy_id.inner())?;
557 }
558
559 for exec_algorithm_id in &self.exec_algorithm_ids {
560 log::debug!("Disposing execution algorithm {exec_algorithm_id}");
561 dispose_component(&exec_algorithm_id.inner())?;
562 }
563
564 self.actor_ids.clear();
565 self.strategy_ids.clear();
566 self.exec_algorithm_ids.clear();
567 self.clocks.clear();
568
569 Ok(())
570 }
571
572 pub fn initialize(&mut self) -> anyhow::Result<()> {
580 let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
581 self.state = new_state;
582
583 Ok(())
584 }
585
586 fn on_start(&mut self) -> anyhow::Result<()> {
587 self.start_components()?;
588
589 self.ts_started = Some(self.clock.borrow().timestamp_ns());
591
592 Ok(())
593 }
594
595 fn on_stop(&mut self) -> anyhow::Result<()> {
596 self.stop_components()?;
597
598 self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
599
600 Ok(())
601 }
602
603 fn on_reset(&mut self) -> anyhow::Result<()> {
604 self.reset_components()?;
605
606 self.ts_started = None;
607 self.ts_stopped = None;
608
609 Ok(())
610 }
611
612 fn on_dispose(&mut self) -> anyhow::Result<()> {
613 if self.is_running() {
614 self.stop()?;
615 }
616
617 self.dispose_components()?;
618
619 Ok(())
620 }
621}
622
623impl Component for Trader {
624 fn component_id(&self) -> ComponentId {
625 ComponentId::new(format!("Trader-{}", self.trader_id))
626 }
627
628 fn state(&self) -> ComponentState {
629 self.state
630 }
631
632 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
633 self.state = self.state.transition(&trigger)?;
634 log::info!("{}", self.state.variant_name());
635 Ok(())
636 }
637
638 fn register(
639 &mut self,
640 _trader_id: TraderId,
641 _clock: Rc<RefCell<dyn Clock>>,
642 _cache: Rc<RefCell<Cache>>,
643 ) -> anyhow::Result<()> {
644 anyhow::bail!("Trader cannot register with itself")
645 }
646
647 fn on_start(&mut self) -> anyhow::Result<()> {
648 Self::on_start(self)
649 }
650
651 fn on_stop(&mut self) -> anyhow::Result<()> {
652 Self::on_stop(self)
653 }
654
655 fn on_reset(&mut self) -> anyhow::Result<()> {
656 Self::on_reset(self)
657 }
658
659 fn on_dispose(&mut self) -> anyhow::Result<()> {
660 Self::on_dispose(self)
661 }
662}
663
664#[cfg(test)]
665mod tests {
666 use std::{
667 cell::RefCell,
668 ops::{Deref, DerefMut},
669 rc::Rc,
670 };
671
672 use nautilus_common::{
673 actor::{DataActorCore, data_actor::DataActorConfig},
674 cache::Cache,
675 clock::TestClock,
676 enums::{ComponentState, Environment},
677 msgbus::MessageBus,
678 };
679 use nautilus_core::UUID4;
680 use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
681 use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
682 use nautilus_model::{
683 identifiers::{ActorId, ComponentId, TraderId},
684 stubs::TestDefault,
685 };
686 use nautilus_portfolio::portfolio::Portfolio;
687 use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
688 use nautilus_trading::strategy::{
689 Strategy as StrategyTrait, config::StrategyConfig, core::StrategyCore,
690 };
691 use rstest::rstest;
692
693 use super::*;
694
695 #[derive(Debug)]
697 struct TestDataActor {
698 core: DataActorCore,
699 }
700
701 impl TestDataActor {
702 fn new(config: DataActorConfig) -> Self {
703 Self {
704 core: DataActorCore::new(config),
705 }
706 }
707 }
708
709 impl DataActor for TestDataActor {}
710
711 impl Deref for TestDataActor {
712 type Target = DataActorCore;
713 fn deref(&self) -> &Self::Target {
714 &self.core
715 }
716 }
717
718 impl DerefMut for TestDataActor {
719 fn deref_mut(&mut self) -> &mut Self::Target {
720 &mut self.core
721 }
722 }
723
724 #[derive(Debug)]
726 struct TestStrategy {
727 core: StrategyCore,
728 }
729
730 impl TestStrategy {
731 fn new(config: StrategyConfig) -> Self {
732 Self {
733 core: StrategyCore::new(config),
734 }
735 }
736 }
737
738 impl DataActor for TestStrategy {}
739
740 impl Deref for TestStrategy {
742 type Target = DataActorCore;
743 fn deref(&self) -> &Self::Target {
744 &self.core
745 }
746 }
747
748 impl DerefMut for TestStrategy {
749 fn deref_mut(&mut self) -> &mut Self::Target {
750 &mut self.core
751 }
752 }
753
754 impl StrategyTrait for TestStrategy {
755 fn core_mut(&mut self) -> &mut StrategyCore {
756 &mut self.core
757 }
758 }
759
760 #[allow(clippy::type_complexity)]
761 fn create_trader_components() -> (
762 Rc<RefCell<MessageBus>>,
763 Rc<RefCell<Cache>>,
764 Rc<RefCell<Portfolio>>,
765 Rc<RefCell<DataEngine>>,
766 Rc<RefCell<RiskEngine>>,
767 Rc<RefCell<ExecutionEngine>>,
768 Rc<RefCell<TestClock>>,
769 ) {
770 let trader_id = TraderId::test_default();
771 let instance_id = UUID4::new();
772 let clock = Rc::new(RefCell::new(TestClock::new()));
773 clock.borrow_mut().set_time(1_000_000_000u64.into());
775 let msgbus = Rc::new(RefCell::new(MessageBus::new(
776 trader_id,
777 instance_id,
778 Some("test".to_string()),
779 None,
780 )));
781 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
782 let portfolio = Rc::new(RefCell::new(Portfolio::new(
783 cache.clone(),
784 clock.clone() as Rc<RefCell<dyn Clock>>,
785 None,
786 )));
787 let data_engine = Rc::new(RefCell::new(DataEngine::new(
788 clock.clone(),
789 cache.clone(),
790 Some(DataEngineConfig::default()),
791 )));
792
793 let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
795 let risk_clock = Rc::new(RefCell::new(TestClock::new()));
796 let risk_portfolio = Portfolio::new(
797 risk_cache.clone(),
798 risk_clock.clone() as Rc<RefCell<dyn Clock>>,
799 None,
800 );
801 let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
802 RiskEngineConfig::default(),
803 risk_portfolio,
804 risk_clock as Rc<RefCell<dyn Clock>>,
805 risk_cache,
806 )));
807 let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
808 clock.clone(),
809 cache.clone(),
810 Some(ExecutionEngineConfig::default()),
811 )));
812
813 (
814 msgbus,
815 cache,
816 portfolio,
817 data_engine,
818 risk_engine,
819 exec_engine,
820 clock,
821 )
822 }
823
824 #[rstest]
825 fn test_trader_creation() {
826 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
827 create_trader_components();
828 let trader_id = TraderId::test_default();
829 let instance_id = UUID4::new();
830
831 let trader = Trader::new(
832 trader_id,
833 instance_id,
834 Environment::Backtest,
835 clock,
836 cache,
837 portfolio,
838 );
839
840 assert_eq!(trader.trader_id(), trader_id);
841 assert_eq!(trader.instance_id(), instance_id);
842 assert_eq!(trader.environment(), Environment::Backtest);
843 assert_eq!(trader.state(), ComponentState::PreInitialized);
844 assert_eq!(trader.actor_count(), 0);
845 assert_eq!(trader.strategy_count(), 0);
846 assert_eq!(trader.exec_algorithm_count(), 0);
847 assert_eq!(trader.component_count(), 0);
848 assert!(!trader.is_running());
849 assert!(!trader.is_stopped());
850 assert!(!trader.is_disposed());
851 assert!(trader.ts_created() > 0);
852 assert!(trader.ts_started().is_none());
853 assert!(trader.ts_stopped().is_none());
854 }
855
856 #[rstest]
857 fn test_trader_component_id() {
858 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
859 create_trader_components();
860 let trader_id = TraderId::from("TRADER-001");
861 let instance_id = UUID4::new();
862
863 let trader = Trader::new(
864 trader_id,
865 instance_id,
866 Environment::Backtest,
867 clock,
868 cache,
869 portfolio,
870 );
871
872 assert_eq!(
873 trader.component_id(),
874 ComponentId::from("Trader-TRADER-001")
875 );
876 }
877
878 #[rstest]
879 fn test_add_actor_success() {
880 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
881 create_trader_components();
882 let trader_id = TraderId::test_default();
883 let instance_id = UUID4::new();
884
885 let mut trader = Trader::new(
886 trader_id,
887 instance_id,
888 Environment::Backtest,
889 clock,
890 cache,
891 portfolio,
892 );
893
894 let actor = TestDataActor::new(DataActorConfig::default());
895 let actor_id = actor.actor_id();
896
897 let result = trader.add_actor(actor);
898 assert!(result.is_ok());
899 assert_eq!(trader.actor_count(), 1);
900 assert_eq!(trader.component_count(), 1);
901 assert!(trader.actor_ids().contains(&actor_id));
902 }
903
904 #[rstest]
905 fn test_add_duplicate_actor_fails() {
906 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
907 create_trader_components();
908 let trader_id = TraderId::test_default();
909 let instance_id = UUID4::new();
910
911 let mut trader = Trader::new(
912 trader_id,
913 instance_id,
914 Environment::Backtest,
915 clock,
916 cache,
917 portfolio,
918 );
919
920 let config = DataActorConfig {
921 actor_id: Some(ActorId::from("TestActor")),
922 ..Default::default()
923 };
924 let actor1 = TestDataActor::new(config.clone());
925 let actor2 = TestDataActor::new(config);
926
927 assert!(trader.add_actor(actor1).is_ok());
929 assert_eq!(trader.actor_count(), 1);
930
931 let result = trader.add_actor(actor2);
933 assert!(result.is_err());
934 assert!(
935 result
936 .unwrap_err()
937 .to_string()
938 .contains("already registered")
939 );
940 assert_eq!(trader.actor_count(), 1);
941 }
942
943 #[rstest]
944 fn test_add_strategy_success() {
945 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
946 create_trader_components();
947 let trader_id = TraderId::test_default();
948 let instance_id = UUID4::new();
949
950 let mut trader = Trader::new(
951 trader_id,
952 instance_id,
953 Environment::Backtest,
954 clock,
955 cache,
956 portfolio,
957 );
958
959 let config = StrategyConfig {
960 strategy_id: Some(StrategyId::from("Test-Strategy")),
961 ..Default::default()
962 };
963 let strategy = TestStrategy::new(config);
964 let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
965
966 let result = trader.add_strategy(strategy);
967 assert!(result.is_ok());
968 assert_eq!(trader.strategy_count(), 1);
969 assert_eq!(trader.component_count(), 1);
970 assert!(trader.strategy_ids().contains(&strategy_id));
971 }
972
973 #[rstest]
974 fn test_add_exec_algorithm_success() {
975 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
976 create_trader_components();
977 let trader_id = TraderId::test_default();
978 let instance_id = UUID4::new();
979
980 let mut trader = Trader::new(
981 trader_id,
982 instance_id,
983 Environment::Backtest,
984 clock,
985 cache,
986 portfolio,
987 );
988
989 let config = DataActorConfig {
990 actor_id: Some(ActorId::from("TestExecAlgorithm")),
991 ..Default::default()
992 };
993 let exec_algorithm = TestDataActor::new(config);
994 let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
995
996 let result = trader.add_exec_algorithm(exec_algorithm);
997 assert!(result.is_ok());
998 assert_eq!(trader.exec_algorithm_count(), 1);
999 assert_eq!(trader.component_count(), 1);
1000 assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
1001 }
1002
1003 #[rstest]
1004 fn test_component_lifecycle() {
1005 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1006 create_trader_components();
1007 let trader_id = TraderId::test_default();
1008 let instance_id = UUID4::new();
1009
1010 let mut trader = Trader::new(
1011 trader_id,
1012 instance_id,
1013 Environment::Backtest,
1014 clock,
1015 cache,
1016 portfolio,
1017 );
1018
1019 let actor = TestDataActor::new(DataActorConfig::default());
1021
1022 let strategy_config = StrategyConfig {
1023 strategy_id: Some(StrategyId::from("Test-Strategy")),
1024 ..Default::default()
1025 };
1026 let strategy = TestStrategy::new(strategy_config);
1027
1028 let exec_algorithm_config = DataActorConfig {
1029 actor_id: Some(ActorId::from("TestExecAlgorithm")),
1030 ..Default::default()
1031 };
1032 let exec_algorithm = TestDataActor::new(exec_algorithm_config);
1033
1034 assert!(trader.add_actor(actor).is_ok());
1035 assert!(trader.add_strategy(strategy).is_ok());
1036 assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1037 assert_eq!(trader.component_count(), 3);
1038
1039 let start_result = trader.start_components();
1041 assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1042
1043 assert!(trader.stop_components().is_ok());
1045
1046 assert!(trader.reset_components().is_ok());
1048
1049 assert!(trader.dispose_components().is_ok());
1051 assert_eq!(trader.component_count(), 0);
1052 }
1053
1054 #[rstest]
1055 fn test_trader_component_lifecycle() {
1056 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1057 create_trader_components();
1058 let trader_id = TraderId::test_default();
1059 let instance_id = UUID4::new();
1060
1061 let mut trader = Trader::new(
1062 trader_id,
1063 instance_id,
1064 Environment::Backtest,
1065 clock,
1066 cache,
1067 portfolio,
1068 );
1069
1070 assert_eq!(trader.state(), ComponentState::PreInitialized);
1072 assert!(!trader.is_running());
1073 assert!(!trader.is_stopped());
1074 assert!(!trader.is_disposed());
1075
1076 assert!(trader.start().is_err());
1078
1079 trader.initialize().unwrap();
1081
1082 assert!(trader.start().is_ok());
1084 assert_eq!(trader.state(), ComponentState::Running);
1085 assert!(trader.is_running());
1086 assert!(trader.ts_started().is_some());
1087
1088 assert!(trader.stop().is_ok());
1090 assert_eq!(trader.state(), ComponentState::Stopped);
1091 assert!(trader.is_stopped());
1092 assert!(trader.ts_stopped().is_some());
1093
1094 assert!(trader.reset().is_ok());
1096 assert_eq!(trader.state(), ComponentState::Ready);
1097 assert!(trader.ts_started().is_none());
1098 assert!(trader.ts_stopped().is_none());
1099
1100 assert!(trader.dispose().is_ok());
1102 assert_eq!(trader.state(), ComponentState::Disposed);
1103 assert!(trader.is_disposed());
1104 }
1105
1106 #[rstest]
1107 fn test_cannot_add_components_while_running() {
1108 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1109 create_trader_components();
1110 let trader_id = TraderId::test_default();
1111 let instance_id = UUID4::new();
1112
1113 let mut trader = Trader::new(
1114 trader_id,
1115 instance_id,
1116 Environment::Backtest,
1117 clock,
1118 cache,
1119 portfolio,
1120 );
1121
1122 trader.state = ComponentState::Running;
1124
1125 let actor = TestDataActor::new(DataActorConfig::default());
1126 let result = trader.add_actor(actor);
1127 assert!(result.is_err());
1128 assert!(
1129 result
1130 .unwrap_err()
1131 .to_string()
1132 .contains("while trader is running")
1133 );
1134 }
1135
1136 #[rstest]
1137 fn test_create_component_clock_backtest_vs_live() {
1138 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1139 create_trader_components();
1140 let trader_id = TraderId::test_default();
1141 let instance_id = UUID4::new();
1142
1143 let trader_backtest = Trader::new(
1145 trader_id,
1146 instance_id,
1147 Environment::Backtest,
1148 clock.clone(),
1149 cache.clone(),
1150 portfolio.clone(),
1151 );
1152
1153 let backtest_clock = trader_backtest.create_component_clock();
1154 assert_ne!(
1156 backtest_clock.as_ptr() as *const _,
1157 clock.as_ptr() as *const _
1158 );
1159
1160 let trader_live = Trader::new(
1162 trader_id,
1163 instance_id,
1164 Environment::Live,
1165 clock.clone(),
1166 cache,
1167 portfolio,
1168 );
1169
1170 let live_clock = trader_live.create_component_clock();
1171 assert_eq!(live_clock.as_ptr() as *const _, clock.as_ptr() as *const _);
1173 }
1174}