1use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
23
24use nautilus_common::{
25 actor::{DataActor, registry::try_get_actor_unchecked},
26 cache::Cache,
27 clock::{Clock, TestClock},
28 component::{
29 Component, dispose_component, register_component_actor, reset_component, start_component,
30 stop_component,
31 },
32 enums::{ComponentState, ComponentTrigger, Environment},
33 msgbus,
34 msgbus::{
35 handler::{ShareableMessageHandler, TypedMessageHandler},
36 switchboard::{get_event_orders_topic, get_event_positions_topic},
37 },
38 timer::{TimeEvent, TimeEventCallback},
39};
40use nautilus_core::{UUID4, UnixNanos};
41use nautilus_model::{
42 events::{OrderEventAny, PositionEvent},
43 identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
44};
45use nautilus_portfolio::portfolio::Portfolio;
46use nautilus_trading::strategy::Strategy;
47
48pub struct Trader {
54 pub trader_id: TraderId,
56 pub instance_id: UUID4,
58 pub environment: Environment,
60 state: ComponentState,
62 clock: Rc<RefCell<dyn Clock>>,
64 cache: Rc<RefCell<Cache>>,
66 portfolio: Rc<RefCell<Portfolio>>,
68 actor_ids: Vec<ActorId>,
70 strategy_ids: Vec<StrategyId>,
72 exec_algorithm_ids: Vec<ExecAlgorithmId>,
74 clocks: HashMap<ComponentId, Rc<RefCell<dyn Clock>>>, ts_created: UnixNanos,
78 ts_started: Option<UnixNanos>,
80 ts_stopped: Option<UnixNanos>,
82}
83
84impl Debug for Trader {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 write!(f, "{:?}", stringify!(TraderId)) }
88}
89
90impl Trader {
91 #[must_use]
93 pub fn new(
94 trader_id: TraderId,
95 instance_id: UUID4,
96 environment: Environment,
97 clock: Rc<RefCell<dyn Clock>>,
98 cache: Rc<RefCell<Cache>>,
99 portfolio: Rc<RefCell<Portfolio>>,
100 ) -> Self {
101 let ts_created = clock.borrow().timestamp_ns();
102
103 Self {
104 trader_id,
105 instance_id,
106 environment,
107 state: ComponentState::PreInitialized,
108 clock,
109 cache,
110 portfolio,
111 actor_ids: Vec::new(),
112 strategy_ids: Vec::new(),
113 exec_algorithm_ids: Vec::new(),
114 clocks: HashMap::new(),
115 ts_created,
116 ts_started: None,
117 ts_stopped: None,
118 }
119 }
120
121 #[must_use]
123 pub const fn trader_id(&self) -> TraderId {
124 self.trader_id
125 }
126
127 #[must_use]
129 pub const fn instance_id(&self) -> UUID4 {
130 self.instance_id
131 }
132
133 #[must_use]
135 pub const fn environment(&self) -> Environment {
136 self.environment
137 }
138
139 #[must_use]
141 pub const fn state(&self) -> ComponentState {
142 self.state
143 }
144
145 #[must_use]
147 pub const fn ts_created(&self) -> UnixNanos {
148 self.ts_created
149 }
150
151 #[must_use]
153 pub const fn ts_started(&self) -> Option<UnixNanos> {
154 self.ts_started
155 }
156
157 #[must_use]
159 pub const fn ts_stopped(&self) -> Option<UnixNanos> {
160 self.ts_stopped
161 }
162
163 #[must_use]
165 pub const fn actor_count(&self) -> usize {
166 self.actor_ids.len()
167 }
168
169 #[must_use]
171 pub const fn strategy_count(&self) -> usize {
172 self.strategy_ids.len()
173 }
174
175 #[must_use]
177 pub const fn exec_algorithm_count(&self) -> usize {
178 self.exec_algorithm_ids.len()
179 }
180
181 #[must_use]
183 pub const fn component_count(&self) -> usize {
184 self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
185 }
186
187 #[must_use]
189 pub fn actor_ids(&self) -> Vec<ActorId> {
190 self.actor_ids.clone()
191 }
192
193 #[must_use]
195 pub fn strategy_ids(&self) -> Vec<StrategyId> {
196 self.strategy_ids.clone()
197 }
198
199 #[must_use]
201 pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
202 self.exec_algorithm_ids.clone()
203 }
204
205 fn create_component_clock(&self) -> Rc<RefCell<dyn Clock>> {
210 match self.environment {
211 Environment::Backtest => {
212 Rc::new(RefCell::new(TestClock::new()))
214 }
215 Environment::Live | Environment::Sandbox => {
216 self.clock.clone()
218 }
219 }
220 }
221
222 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
230 where
231 T: DataActor + Component + Debug + 'static,
232 {
233 self.validate_component_registration()?;
234
235 let actor_id = actor.actor_id();
236
237 if self.actor_ids.contains(&actor_id) {
239 anyhow::bail!("Actor '{actor_id}' is already registered");
240 }
241
242 let clock = self.create_component_clock();
243 let component_id = ComponentId::new(actor_id.inner().as_str());
244 self.clocks.insert(component_id, clock.clone());
245
246 let mut actor_mut = actor;
247 actor_mut.register(self.trader_id, clock, self.cache.clone())?;
248
249 self.add_registered_actor(actor_mut)
250 }
251
252 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
264 where
265 F: FnOnce() -> anyhow::Result<T>,
266 T: DataActor + Component + Debug + 'static,
267 {
268 let actor = factory()?;
269
270 self.add_actor(actor)
271 }
272
273 pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
279 where
280 T: DataActor + Component + Debug + 'static,
281 {
282 let actor_id = actor.actor_id();
283 let mem_addr = actor.mem_address();
284
285 register_component_actor(actor);
287
288 self.actor_ids.push(actor_id);
290
291 log::info!(
292 "Registered '{actor_id}' at mem_addr {mem_addr} with trader {}",
293 self.trader_id
294 );
295
296 Ok(())
297 }
298
299 pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
309 if self.actor_ids.contains(&actor_id) {
311 anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
312 }
313
314 self.actor_ids.push(actor_id);
316
317 log::debug!(
318 "Added actor ID '{actor_id}' to trader {} for lifecycle management",
319 self.trader_id
320 );
321
322 Ok(())
323 }
324
325 pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
337 where
338 T: Strategy + Component + Debug + 'static,
339 {
340 self.validate_component_registration()?;
341
342 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
343
344 if self.strategy_ids.contains(&strategy_id) {
346 anyhow::bail!("Strategy '{strategy_id}' is already registered");
347 }
348
349 let clock = self.create_component_clock();
350 let component_id = strategy.component_id();
351 self.clocks.insert(component_id, clock.clone());
352
353 strategy.core_mut().register(
355 self.trader_id,
356 clock.clone(),
357 self.cache.clone(),
358 self.portfolio.clone(),
359 )?;
360
361 let actor_id = strategy.actor_id().inner();
363 let callback = TimeEventCallback::from(move |event: TimeEvent| {
364 if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
365 actor.handle_time_event(&event);
366 } else {
367 log::error!("Strategy {actor_id} not found for time event handling");
368 }
369 });
370 clock.borrow_mut().register_default_handler(callback);
371
372 strategy.initialize()?;
374
375 register_component_actor(strategy);
377
378 let order_topic = get_event_orders_topic(strategy_id);
379 let order_actor_id = actor_id;
380 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
381 move |event: &OrderEventAny| {
382 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
383 strategy.handle_order_event(event.clone());
384 } else {
385 log::error!("Strategy {order_actor_id} not found for order event handling");
386 }
387 },
388 )));
389 msgbus::subscribe_topic(order_topic, handler, None);
390
391 let position_topic = get_event_positions_topic(strategy_id);
392 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
393 move |event: &PositionEvent| {
394 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
395 strategy.handle_position_event(event.clone());
396 } else {
397 log::error!("Strategy {actor_id} not found for position event handling");
398 }
399 },
400 )));
401 msgbus::subscribe_topic(position_topic, handler, None);
402
403 self.strategy_ids.push(strategy_id);
404 log::info!(
405 "Registered strategy '{strategy_id}' with trader {}",
406 self.trader_id
407 );
408
409 Ok(())
410 }
411
412 pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
423 where
424 T: DataActor + Component + Debug + 'static,
425 {
426 self.validate_component_registration()?;
427
428 let exec_algorithm_id =
429 ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
430
431 if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
433 anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
434 }
435
436 let clock = self.create_component_clock();
437 let component_id = exec_algorithm.component_id();
438 self.clocks.insert(component_id, clock.clone());
439
440 exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
441
442 register_component_actor(exec_algorithm);
444
445 self.exec_algorithm_ids.push(exec_algorithm_id);
446 log::info!(
447 "Registered execution algorithm '{exec_algorithm_id}' with trader {}",
448 self.trader_id
449 );
450
451 Ok(())
452 }
453
454 fn validate_component_registration(&self) -> anyhow::Result<()> {
456 match self.state {
457 ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
458 Ok(())
459 }
460 ComponentState::Running => {
461 anyhow::bail!("Cannot add components while trader is running")
462 }
463 ComponentState::Disposed => {
464 anyhow::bail!("Cannot add components to disposed trader")
465 }
466 _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
467 }
468 }
469
470 pub fn start_components(&mut self) -> anyhow::Result<()> {
476 for actor_id in &self.actor_ids {
477 log::debug!("Starting actor '{actor_id}'");
478 start_component(&actor_id.inner())?;
479 }
480
481 for strategy_id in &self.strategy_ids {
482 log::debug!("Starting strategy '{strategy_id}'");
483 start_component(&strategy_id.inner())?;
484 }
485
486 for exec_algorithm_id in &self.exec_algorithm_ids {
487 log::debug!("Starting execution algorithm '{exec_algorithm_id}'");
488 start_component(&exec_algorithm_id.inner())?;
489 }
490
491 Ok(())
492 }
493
494 pub fn stop_components(&mut self) -> anyhow::Result<()> {
500 for actor_id in &self.actor_ids {
501 log::debug!("Stopping actor '{actor_id}'");
502 stop_component(&actor_id.inner())?;
503 }
504
505 for exec_algorithm_id in &self.exec_algorithm_ids {
506 log::debug!("Stopping execution algorithm '{exec_algorithm_id}'");
507 stop_component(&exec_algorithm_id.inner())?;
508 }
509
510 for strategy_id in &self.strategy_ids {
511 log::debug!("Stopping strategy '{strategy_id}'");
512 stop_component(&strategy_id.inner())?;
513 }
514
515 Ok(())
516 }
517
518 pub fn reset_components(&mut self) -> anyhow::Result<()> {
524 for actor_id in &self.actor_ids {
525 log::debug!("Resetting actor '{actor_id}'");
526 reset_component(&actor_id.inner())?;
527 }
528
529 for strategy_id in &self.strategy_ids {
530 log::debug!("Resetting strategy '{strategy_id}'");
531 reset_component(&strategy_id.inner())?;
532 }
533
534 for exec_algorithm_id in &self.exec_algorithm_ids {
535 log::debug!("Resetting execution algorithm '{exec_algorithm_id}'");
536 reset_component(&exec_algorithm_id.inner())?;
537 }
538
539 Ok(())
540 }
541
542 pub fn dispose_components(&mut self) -> anyhow::Result<()> {
548 for actor_id in &self.actor_ids {
549 log::debug!("Disposing actor '{actor_id}'");
550 dispose_component(&actor_id.inner())?;
551 }
552
553 for strategy_id in &self.strategy_ids {
554 log::debug!("Disposing strategy '{strategy_id}'");
555 dispose_component(&strategy_id.inner())?;
556 }
557
558 for exec_algorithm_id in &self.exec_algorithm_ids {
559 log::debug!("Disposing execution algorithm '{exec_algorithm_id}'");
560 dispose_component(&exec_algorithm_id.inner())?;
561 }
562
563 self.actor_ids.clear();
564 self.strategy_ids.clear();
565 self.exec_algorithm_ids.clear();
566 self.clocks.clear();
567
568 Ok(())
569 }
570
571 pub fn initialize(&mut self) -> anyhow::Result<()> {
579 let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
580 self.state = new_state;
581
582 Ok(())
583 }
584
585 fn on_start(&mut self) -> anyhow::Result<()> {
586 self.start_components()?;
587
588 self.ts_started = Some(self.clock.borrow().timestamp_ns());
590
591 Ok(())
592 }
593
594 fn on_stop(&mut self) -> anyhow::Result<()> {
595 self.stop_components()?;
596
597 self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
598
599 Ok(())
600 }
601
602 fn on_reset(&mut self) -> anyhow::Result<()> {
603 self.reset_components()?;
604
605 self.ts_started = None;
606 self.ts_stopped = None;
607
608 Ok(())
609 }
610
611 fn on_dispose(&mut self) -> anyhow::Result<()> {
612 if self.is_running() {
613 self.stop()?;
614 }
615
616 self.dispose_components()?;
617
618 Ok(())
619 }
620}
621
622impl Component for Trader {
623 fn component_id(&self) -> ComponentId {
624 ComponentId::new(format!("Trader-{}", self.trader_id))
625 }
626
627 fn state(&self) -> ComponentState {
628 self.state
629 }
630
631 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
632 self.state = self.state.transition(&trigger)?;
633 log::info!("{}", self.state.variant_name());
634 Ok(())
635 }
636
637 fn register(
638 &mut self,
639 _trader_id: TraderId,
640 _clock: Rc<RefCell<dyn Clock>>,
641 _cache: Rc<RefCell<Cache>>,
642 ) -> anyhow::Result<()> {
643 anyhow::bail!("Trader cannot register with itself")
644 }
645
646 fn on_start(&mut self) -> anyhow::Result<()> {
647 Self::on_start(self)
648 }
649
650 fn on_stop(&mut self) -> anyhow::Result<()> {
651 Self::on_stop(self)
652 }
653
654 fn on_reset(&mut self) -> anyhow::Result<()> {
655 Self::on_reset(self)
656 }
657
658 fn on_dispose(&mut self) -> anyhow::Result<()> {
659 Self::on_dispose(self)
660 }
661}
662
663#[cfg(test)]
664mod tests {
665 use std::{
666 cell::RefCell,
667 ops::{Deref, DerefMut},
668 rc::Rc,
669 };
670
671 use nautilus_common::{
672 actor::{DataActorCore, data_actor::DataActorConfig},
673 cache::Cache,
674 clock::TestClock,
675 enums::{ComponentState, Environment},
676 msgbus::MessageBus,
677 };
678 use nautilus_core::UUID4;
679 use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
680 use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
681 use nautilus_model::identifiers::{ActorId, ComponentId, TraderId};
682 use nautilus_portfolio::portfolio::Portfolio;
683 use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
684 use nautilus_trading::strategy::{
685 Strategy as StrategyTrait, config::StrategyConfig, core::StrategyCore,
686 };
687 use rstest::rstest;
688
689 use super::*;
690
691 #[derive(Debug)]
693 struct TestDataActor {
694 core: DataActorCore,
695 }
696
697 impl TestDataActor {
698 fn new(config: DataActorConfig) -> Self {
699 Self {
700 core: DataActorCore::new(config),
701 }
702 }
703 }
704
705 impl DataActor for TestDataActor {}
706
707 impl Deref for TestDataActor {
708 type Target = DataActorCore;
709 fn deref(&self) -> &Self::Target {
710 &self.core
711 }
712 }
713
714 impl DerefMut for TestDataActor {
715 fn deref_mut(&mut self) -> &mut Self::Target {
716 &mut self.core
717 }
718 }
719
720 #[derive(Debug)]
722 struct TestStrategy {
723 core: StrategyCore,
724 }
725
726 impl TestStrategy {
727 fn new(config: StrategyConfig) -> Self {
728 Self {
729 core: StrategyCore::new(config),
730 }
731 }
732 }
733
734 impl DataActor for TestStrategy {}
735
736 impl Deref for TestStrategy {
738 type Target = DataActorCore;
739 fn deref(&self) -> &Self::Target {
740 &self.core
741 }
742 }
743
744 impl DerefMut for TestStrategy {
745 fn deref_mut(&mut self) -> &mut Self::Target {
746 &mut self.core
747 }
748 }
749
750 impl StrategyTrait for TestStrategy {
751 fn core_mut(&mut self) -> &mut StrategyCore {
752 &mut self.core
753 }
754 }
755
756 #[allow(clippy::type_complexity)]
757 fn create_trader_components() -> (
758 Rc<RefCell<MessageBus>>,
759 Rc<RefCell<Cache>>,
760 Rc<RefCell<Portfolio>>,
761 Rc<RefCell<DataEngine>>,
762 Rc<RefCell<RiskEngine>>,
763 Rc<RefCell<ExecutionEngine>>,
764 Rc<RefCell<TestClock>>,
765 ) {
766 let trader_id = TraderId::default();
767 let instance_id = UUID4::new();
768 let clock = Rc::new(RefCell::new(TestClock::new()));
769 clock.borrow_mut().set_time(1_000_000_000u64.into());
771 let msgbus = Rc::new(RefCell::new(MessageBus::new(
772 trader_id,
773 instance_id,
774 Some("test".to_string()),
775 None,
776 )));
777 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
778 let portfolio = Rc::new(RefCell::new(Portfolio::new(
779 cache.clone(),
780 clock.clone() as Rc<RefCell<dyn Clock>>,
781 None,
782 )));
783 let data_engine = Rc::new(RefCell::new(DataEngine::new(
784 clock.clone(),
785 cache.clone(),
786 Some(DataEngineConfig::default()),
787 )));
788
789 let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
791 let risk_clock = Rc::new(RefCell::new(TestClock::new()));
792 let risk_portfolio = Portfolio::new(
793 risk_cache.clone(),
794 risk_clock.clone() as Rc<RefCell<dyn Clock>>,
795 None,
796 );
797 let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
798 RiskEngineConfig::default(),
799 risk_portfolio,
800 risk_clock as Rc<RefCell<dyn Clock>>,
801 risk_cache,
802 )));
803 let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
804 clock.clone(),
805 cache.clone(),
806 Some(ExecutionEngineConfig::default()),
807 )));
808
809 (
810 msgbus,
811 cache,
812 portfolio,
813 data_engine,
814 risk_engine,
815 exec_engine,
816 clock,
817 )
818 }
819
820 #[rstest]
821 fn test_trader_creation() {
822 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
823 create_trader_components();
824 let trader_id = TraderId::default();
825 let instance_id = UUID4::new();
826
827 let trader = Trader::new(
828 trader_id,
829 instance_id,
830 Environment::Backtest,
831 clock,
832 cache,
833 portfolio,
834 );
835
836 assert_eq!(trader.trader_id(), trader_id);
837 assert_eq!(trader.instance_id(), instance_id);
838 assert_eq!(trader.environment(), Environment::Backtest);
839 assert_eq!(trader.state(), ComponentState::PreInitialized);
840 assert_eq!(trader.actor_count(), 0);
841 assert_eq!(trader.strategy_count(), 0);
842 assert_eq!(trader.exec_algorithm_count(), 0);
843 assert_eq!(trader.component_count(), 0);
844 assert!(!trader.is_running());
845 assert!(!trader.is_stopped());
846 assert!(!trader.is_disposed());
847 assert!(trader.ts_created() > 0);
848 assert!(trader.ts_started().is_none());
849 assert!(trader.ts_stopped().is_none());
850 }
851
852 #[rstest]
853 fn test_trader_component_id() {
854 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
855 create_trader_components();
856 let trader_id = TraderId::from("TRADER-001");
857 let instance_id = UUID4::new();
858
859 let trader = Trader::new(
860 trader_id,
861 instance_id,
862 Environment::Backtest,
863 clock,
864 cache,
865 portfolio,
866 );
867
868 assert_eq!(
869 trader.component_id(),
870 ComponentId::from("Trader-TRADER-001")
871 );
872 }
873
874 #[rstest]
875 fn test_add_actor_success() {
876 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
877 create_trader_components();
878 let trader_id = TraderId::default();
879 let instance_id = UUID4::new();
880
881 let mut trader = Trader::new(
882 trader_id,
883 instance_id,
884 Environment::Backtest,
885 clock,
886 cache,
887 portfolio,
888 );
889
890 let actor = TestDataActor::new(DataActorConfig::default());
891 let actor_id = actor.actor_id();
892
893 let result = trader.add_actor(actor);
894 assert!(result.is_ok());
895 assert_eq!(trader.actor_count(), 1);
896 assert_eq!(trader.component_count(), 1);
897 assert!(trader.actor_ids().contains(&actor_id));
898 }
899
900 #[rstest]
901 fn test_add_duplicate_actor_fails() {
902 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
903 create_trader_components();
904 let trader_id = TraderId::default();
905 let instance_id = UUID4::new();
906
907 let mut trader = Trader::new(
908 trader_id,
909 instance_id,
910 Environment::Backtest,
911 clock,
912 cache,
913 portfolio,
914 );
915
916 let config = DataActorConfig {
917 actor_id: Some(ActorId::from("TestActor")),
918 ..Default::default()
919 };
920 let actor1 = TestDataActor::new(config.clone());
921 let actor2 = TestDataActor::new(config);
922
923 assert!(trader.add_actor(actor1).is_ok());
925 assert_eq!(trader.actor_count(), 1);
926
927 let result = trader.add_actor(actor2);
929 assert!(result.is_err());
930 assert!(
931 result
932 .unwrap_err()
933 .to_string()
934 .contains("already registered")
935 );
936 assert_eq!(trader.actor_count(), 1);
937 }
938
939 #[rstest]
940 fn test_add_strategy_success() {
941 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
942 create_trader_components();
943 let trader_id = TraderId::default();
944 let instance_id = UUID4::new();
945
946 let mut trader = Trader::new(
947 trader_id,
948 instance_id,
949 Environment::Backtest,
950 clock,
951 cache,
952 portfolio,
953 );
954
955 let config = StrategyConfig {
956 strategy_id: Some(StrategyId::from("Test-Strategy")),
957 ..Default::default()
958 };
959 let strategy = TestStrategy::new(config);
960 let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
961
962 let result = trader.add_strategy(strategy);
963 assert!(result.is_ok());
964 assert_eq!(trader.strategy_count(), 1);
965 assert_eq!(trader.component_count(), 1);
966 assert!(trader.strategy_ids().contains(&strategy_id));
967 }
968
969 #[rstest]
970 fn test_add_exec_algorithm_success() {
971 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
972 create_trader_components();
973 let trader_id = TraderId::default();
974 let instance_id = UUID4::new();
975
976 let mut trader = Trader::new(
977 trader_id,
978 instance_id,
979 Environment::Backtest,
980 clock,
981 cache,
982 portfolio,
983 );
984
985 let config = DataActorConfig {
986 actor_id: Some(ActorId::from("TestExecAlgorithm")),
987 ..Default::default()
988 };
989 let exec_algorithm = TestDataActor::new(config);
990 let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
991
992 let result = trader.add_exec_algorithm(exec_algorithm);
993 assert!(result.is_ok());
994 assert_eq!(trader.exec_algorithm_count(), 1);
995 assert_eq!(trader.component_count(), 1);
996 assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
997 }
998
999 #[rstest]
1000 fn test_component_lifecycle() {
1001 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1002 create_trader_components();
1003 let trader_id = TraderId::default();
1004 let instance_id = UUID4::new();
1005
1006 let mut trader = Trader::new(
1007 trader_id,
1008 instance_id,
1009 Environment::Backtest,
1010 clock,
1011 cache,
1012 portfolio,
1013 );
1014
1015 let actor = TestDataActor::new(DataActorConfig::default());
1017
1018 let strategy_config = StrategyConfig {
1019 strategy_id: Some(StrategyId::from("Test-Strategy")),
1020 ..Default::default()
1021 };
1022 let strategy = TestStrategy::new(strategy_config);
1023
1024 let exec_algorithm_config = DataActorConfig {
1025 actor_id: Some(ActorId::from("TestExecAlgorithm")),
1026 ..Default::default()
1027 };
1028 let exec_algorithm = TestDataActor::new(exec_algorithm_config);
1029
1030 assert!(trader.add_actor(actor).is_ok());
1031 assert!(trader.add_strategy(strategy).is_ok());
1032 assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1033 assert_eq!(trader.component_count(), 3);
1034
1035 let start_result = trader.start_components();
1037 assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1038
1039 assert!(trader.stop_components().is_ok());
1041
1042 assert!(trader.reset_components().is_ok());
1044
1045 assert!(trader.dispose_components().is_ok());
1047 assert_eq!(trader.component_count(), 0);
1048 }
1049
1050 #[rstest]
1051 fn test_trader_component_lifecycle() {
1052 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1053 create_trader_components();
1054 let trader_id = TraderId::default();
1055 let instance_id = UUID4::new();
1056
1057 let mut trader = Trader::new(
1058 trader_id,
1059 instance_id,
1060 Environment::Backtest,
1061 clock,
1062 cache,
1063 portfolio,
1064 );
1065
1066 assert_eq!(trader.state(), ComponentState::PreInitialized);
1068 assert!(!trader.is_running());
1069 assert!(!trader.is_stopped());
1070 assert!(!trader.is_disposed());
1071
1072 assert!(trader.start().is_err());
1074
1075 trader.initialize().unwrap();
1077
1078 assert!(trader.start().is_ok());
1080 assert_eq!(trader.state(), ComponentState::Running);
1081 assert!(trader.is_running());
1082 assert!(trader.ts_started().is_some());
1083
1084 assert!(trader.stop().is_ok());
1086 assert_eq!(trader.state(), ComponentState::Stopped);
1087 assert!(trader.is_stopped());
1088 assert!(trader.ts_stopped().is_some());
1089
1090 assert!(trader.reset().is_ok());
1092 assert_eq!(trader.state(), ComponentState::Ready);
1093 assert!(trader.ts_started().is_none());
1094 assert!(trader.ts_stopped().is_none());
1095
1096 assert!(trader.dispose().is_ok());
1098 assert_eq!(trader.state(), ComponentState::Disposed);
1099 assert!(trader.is_disposed());
1100 }
1101
1102 #[rstest]
1103 fn test_cannot_add_components_while_running() {
1104 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1105 create_trader_components();
1106 let trader_id = TraderId::default();
1107 let instance_id = UUID4::new();
1108
1109 let mut trader = Trader::new(
1110 trader_id,
1111 instance_id,
1112 Environment::Backtest,
1113 clock,
1114 cache,
1115 portfolio,
1116 );
1117
1118 trader.state = ComponentState::Running;
1120
1121 let actor = TestDataActor::new(DataActorConfig::default());
1122 let result = trader.add_actor(actor);
1123 assert!(result.is_err());
1124 assert!(
1125 result
1126 .unwrap_err()
1127 .to_string()
1128 .contains("while trader is running")
1129 );
1130 }
1131
1132 #[rstest]
1133 fn test_create_component_clock_backtest_vs_live() {
1134 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1135 create_trader_components();
1136 let trader_id = TraderId::default();
1137 let instance_id = UUID4::new();
1138
1139 let trader_backtest = Trader::new(
1141 trader_id,
1142 instance_id,
1143 Environment::Backtest,
1144 clock.clone(),
1145 cache.clone(),
1146 portfolio.clone(),
1147 );
1148
1149 let backtest_clock = trader_backtest.create_component_clock();
1150 assert_ne!(
1152 backtest_clock.as_ptr() as *const _,
1153 clock.as_ptr() as *const _
1154 );
1155
1156 let trader_live = Trader::new(
1158 trader_id,
1159 instance_id,
1160 Environment::Live,
1161 clock.clone(),
1162 cache,
1163 portfolio,
1164 );
1165
1166 let live_clock = trader_live.create_component_clock();
1167 assert_eq!(live_clock.as_ptr() as *const _, clock.as_ptr() as *const _);
1169 }
1170}