1use std::{cell::RefCell, fmt::Debug, rc::Rc};
23
24use ahash::AHashMap;
25use nautilus_common::{
26 actor::{DataActor, registry::try_get_actor_unchecked},
27 cache::Cache,
28 clock::{Clock, TestClock},
29 component::{
30 Component, dispose_component, register_component_actor, reset_component, start_component,
31 stop_component,
32 },
33 enums::{ComponentState, ComponentTrigger, Environment},
34 msgbus,
35 msgbus::{
36 TypedHandler,
37 switchboard::{get_event_orders_topic, get_event_positions_topic},
38 },
39 timer::{TimeEvent, TimeEventCallback},
40};
41use nautilus_core::{UUID4, UnixNanos};
42use nautilus_model::{
43 events::{OrderEventAny, PositionEvent},
44 identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
45};
46use nautilus_portfolio::portfolio::Portfolio;
47use nautilus_trading::strategy::Strategy;
48use ustr::Ustr;
49
50pub struct Trader {
66 pub trader_id: TraderId,
68 pub instance_id: UUID4,
70 pub environment: Environment,
72 state: ComponentState,
74 clock: Rc<RefCell<dyn Clock>>,
76 cache: Rc<RefCell<Cache>>,
78 portfolio: Rc<RefCell<Portfolio>>,
80 actor_ids: Vec<ActorId>,
82 strategy_ids: Vec<StrategyId>,
84 strategy_stop_fns: AHashMap<StrategyId, Box<dyn FnMut() -> bool>>,
86 strategy_handler_ids: AHashMap<StrategyId, (Ustr, Ustr)>,
88 exec_algorithm_ids: Vec<ExecAlgorithmId>,
90 clocks: AHashMap<ComponentId, Rc<RefCell<dyn Clock>>>,
92 ts_created: UnixNanos,
94 ts_started: Option<UnixNanos>,
96 ts_stopped: Option<UnixNanos>,
98}
99
100impl Debug for Trader {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 write!(f, "{:?}", stringify!(TraderId)) }
104}
105
106impl Trader {
107 #[must_use]
109 pub fn new(
110 trader_id: TraderId,
111 instance_id: UUID4,
112 environment: Environment,
113 clock: Rc<RefCell<dyn Clock>>,
114 cache: Rc<RefCell<Cache>>,
115 portfolio: Rc<RefCell<Portfolio>>,
116 ) -> Self {
117 let ts_created = clock.borrow().timestamp_ns();
118
119 Self {
120 trader_id,
121 instance_id,
122 environment,
123 state: ComponentState::PreInitialized,
124 clock,
125 cache,
126 portfolio,
127 actor_ids: Vec::new(),
128 strategy_ids: Vec::new(),
129 strategy_stop_fns: AHashMap::new(),
130 strategy_handler_ids: AHashMap::new(),
131 exec_algorithm_ids: Vec::new(),
132 clocks: AHashMap::new(),
133 ts_created,
134 ts_started: None,
135 ts_stopped: None,
136 }
137 }
138
139 #[must_use]
141 pub const fn trader_id(&self) -> TraderId {
142 self.trader_id
143 }
144
145 #[must_use]
147 pub const fn instance_id(&self) -> UUID4 {
148 self.instance_id
149 }
150
151 #[must_use]
153 pub const fn environment(&self) -> Environment {
154 self.environment
155 }
156
157 #[must_use]
159 pub const fn state(&self) -> ComponentState {
160 self.state
161 }
162
163 #[must_use]
165 pub const fn ts_created(&self) -> UnixNanos {
166 self.ts_created
167 }
168
169 #[must_use]
171 pub const fn ts_started(&self) -> Option<UnixNanos> {
172 self.ts_started
173 }
174
175 #[must_use]
177 pub const fn ts_stopped(&self) -> Option<UnixNanos> {
178 self.ts_stopped
179 }
180
181 #[must_use]
183 pub const fn actor_count(&self) -> usize {
184 self.actor_ids.len()
185 }
186
187 #[must_use]
189 pub const fn strategy_count(&self) -> usize {
190 self.strategy_ids.len()
191 }
192
193 #[must_use]
195 pub const fn exec_algorithm_count(&self) -> usize {
196 self.exec_algorithm_ids.len()
197 }
198
199 pub fn get_component_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
201 self.clocks.values().cloned().collect()
202 }
203
204 #[must_use]
206 pub const fn component_count(&self) -> usize {
207 self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
208 }
209
210 #[must_use]
212 pub fn actor_ids(&self) -> Vec<ActorId> {
213 self.actor_ids.clone()
214 }
215
216 #[must_use]
218 pub fn strategy_ids(&self) -> Vec<StrategyId> {
219 self.strategy_ids.clone()
220 }
221
222 #[must_use]
224 pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
225 self.exec_algorithm_ids.clone()
226 }
227
228 fn create_component_clock(&self) -> Rc<RefCell<dyn Clock>> {
233 match self.environment {
234 Environment::Backtest => {
235 Rc::new(RefCell::new(TestClock::new()))
237 }
238 Environment::Live | Environment::Sandbox => {
239 self.clock.clone()
241 }
242 }
243 }
244
245 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
253 where
254 T: DataActor + Component + Debug + 'static,
255 {
256 self.validate_component_registration()?;
257
258 let actor_id = actor.actor_id();
259
260 if self.actor_ids.contains(&actor_id) {
262 anyhow::bail!("Actor {actor_id} is already registered");
263 }
264
265 let clock = self.create_component_clock();
266 let component_id = ComponentId::new(actor_id.inner().as_str());
267 self.clocks.insert(component_id, clock.clone());
268
269 let mut actor_mut = actor;
270 actor_mut.register(self.trader_id, clock, self.cache.clone())?;
271
272 self.add_registered_actor(actor_mut)
273 }
274
275 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
287 where
288 F: FnOnce() -> anyhow::Result<T>,
289 T: DataActor + Component + Debug + 'static,
290 {
291 let actor = factory()?;
292
293 self.add_actor(actor)
294 }
295
296 pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
302 where
303 T: DataActor + Component + Debug + 'static,
304 {
305 let actor_id = actor.actor_id();
306
307 register_component_actor(actor);
309
310 self.actor_ids.push(actor_id);
312
313 log::info!("Registered actor {actor_id} with trader {}", self.trader_id);
314
315 Ok(())
316 }
317
318 pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
328 if self.actor_ids.contains(&actor_id) {
330 anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
331 }
332
333 self.actor_ids.push(actor_id);
335
336 log::debug!(
337 "Added actor ID '{actor_id}' to trader {} for lifecycle management",
338 self.trader_id
339 );
340
341 Ok(())
342 }
343
344 pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
356 where
357 T: Strategy + Component + Debug + 'static,
358 {
359 self.validate_component_registration()?;
360
361 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
362
363 if self.strategy_ids.contains(&strategy_id) {
365 anyhow::bail!("Strategy {strategy_id} is already registered");
366 }
367
368 let clock = self.create_component_clock();
369 let component_id = strategy.component_id();
370 self.clocks.insert(component_id, clock.clone());
371
372 strategy.core_mut().register(
374 self.trader_id,
375 clock.clone(),
376 self.cache.clone(),
377 self.portfolio.clone(),
378 )?;
379
380 let actor_id = strategy.actor_id().inner();
382 let callback = TimeEventCallback::from(move |event: TimeEvent| {
383 if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
384 actor.handle_time_event(&event);
385 } else {
386 log::error!("Strategy {actor_id} not found for time event handling");
387 }
388 });
389 clock.borrow_mut().register_default_handler(callback);
390
391 strategy.initialize()?;
393
394 register_component_actor(strategy);
396
397 let order_topic = get_event_orders_topic(strategy_id);
398 let order_actor_id = actor_id;
399 let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
400 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
401 strategy.handle_order_event(event.clone());
402 } else {
403 log::error!("Strategy {order_actor_id} not found for order event handling");
404 }
405 });
406 let order_handler_id = order_handler.id();
407 msgbus::subscribe_order_events(order_topic.into(), order_handler, None);
408
409 let position_topic = get_event_positions_topic(strategy_id);
410 let position_handler = TypedHandler::from(move |event: &PositionEvent| {
411 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
412 strategy.handle_position_event(event.clone());
413 } else {
414 log::error!("Strategy {actor_id} not found for position event handling");
415 }
416 });
417 let position_handler_id = position_handler.id();
418 msgbus::subscribe_position_events(position_topic.into(), position_handler, None);
419
420 self.strategy_ids.push(strategy_id);
421 self.strategy_handler_ids
422 .insert(strategy_id, (order_handler_id, position_handler_id));
423
424 let stop_actor_id = actor_id;
425 let stop_fn = Box::new(move || -> bool {
426 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&stop_actor_id) {
427 Strategy::stop(&mut *strategy)
428 } else {
429 log::error!("Strategy {stop_actor_id} not found for stop");
430 true }
432 });
433 self.strategy_stop_fns.insert(strategy_id, stop_fn);
434
435 log::info!(
436 "Registered strategy {strategy_id} with trader {}",
437 self.trader_id
438 );
439
440 Ok(())
441 }
442
443 pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
454 where
455 T: DataActor + Component + Debug + 'static,
456 {
457 self.validate_component_registration()?;
458
459 let exec_algorithm_id =
460 ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
461
462 if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
464 anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
465 }
466
467 let clock = self.create_component_clock();
468 let component_id = exec_algorithm.component_id();
469 self.clocks.insert(component_id, clock.clone());
470
471 exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
472
473 register_component_actor(exec_algorithm);
475
476 self.exec_algorithm_ids.push(exec_algorithm_id);
477
478 log::info!(
479 "Registered execution algorithm {exec_algorithm_id} with trader {}",
480 self.trader_id
481 );
482
483 Ok(())
484 }
485
486 fn validate_component_registration(&self) -> anyhow::Result<()> {
488 match self.state {
489 ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
490 Ok(())
491 }
492 ComponentState::Running => {
493 anyhow::bail!("Cannot add components while trader is running")
494 }
495 ComponentState::Disposed => {
496 anyhow::bail!("Cannot add components to disposed trader")
497 }
498 _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
499 }
500 }
501
502 pub fn start_components(&mut self) -> anyhow::Result<()> {
508 for actor_id in &self.actor_ids {
509 log::debug!("Starting actor {actor_id}");
510 start_component(&actor_id.inner())?;
511 }
512
513 for strategy_id in &self.strategy_ids {
514 log::debug!("Starting strategy {strategy_id}");
515 start_component(&strategy_id.inner())?;
516 }
517
518 for exec_algorithm_id in &self.exec_algorithm_ids {
519 log::debug!("Starting execution algorithm {exec_algorithm_id}");
520 start_component(&exec_algorithm_id.inner())?;
521 }
522
523 Ok(())
524 }
525
526 pub fn stop_components(&mut self) -> anyhow::Result<()> {
532 for actor_id in &self.actor_ids {
533 log::debug!("Stopping actor {actor_id}");
534 stop_component(&actor_id.inner())?;
535 }
536
537 for exec_algorithm_id in &self.exec_algorithm_ids {
538 log::debug!("Stopping execution algorithm {exec_algorithm_id}");
539 stop_component(&exec_algorithm_id.inner())?;
540 }
541
542 for strategy_id in self.strategy_ids.clone() {
543 log::debug!("Stopping strategy {strategy_id}");
544 let should_proceed = self
545 .strategy_stop_fns
546 .get_mut(&strategy_id)
547 .is_none_or(|stop_fn| stop_fn());
548 if should_proceed {
549 stop_component(&strategy_id.inner())?;
550 }
551 }
552
553 Ok(())
554 }
555
556 pub fn reset_components(&mut self) -> anyhow::Result<()> {
562 for actor_id in &self.actor_ids {
563 log::debug!("Resetting actor {actor_id}");
564 reset_component(&actor_id.inner())?;
565 }
566
567 for strategy_id in &self.strategy_ids {
568 log::debug!("Resetting strategy {strategy_id}");
569 reset_component(&strategy_id.inner())?;
570 }
571
572 for exec_algorithm_id in &self.exec_algorithm_ids {
573 log::debug!("Resetting execution algorithm {exec_algorithm_id}");
574 reset_component(&exec_algorithm_id.inner())?;
575 }
576
577 Ok(())
578 }
579
580 pub fn dispose_components(&mut self) -> anyhow::Result<()> {
586 for actor_id in &self.actor_ids {
587 log::debug!("Disposing actor {actor_id}");
588 dispose_component(&actor_id.inner())?;
589 }
590
591 for strategy_id in &self.strategy_ids {
592 log::debug!("Disposing strategy {strategy_id}");
593 dispose_component(&strategy_id.inner())?;
594 }
595
596 for exec_algorithm_id in &self.exec_algorithm_ids {
597 log::debug!("Disposing execution algorithm {exec_algorithm_id}");
598 dispose_component(&exec_algorithm_id.inner())?;
599 }
600
601 self.actor_ids.clear();
602 self.strategy_ids.clear();
603 self.exec_algorithm_ids.clear();
604 self.clocks.clear();
605
606 Ok(())
607 }
608
609 pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
615 for strategy_id in &self.strategy_ids {
616 log::debug!("Disposing strategy {strategy_id}");
617 dispose_component(&strategy_id.inner())?;
618 let component_id = ComponentId::new(strategy_id.inner().as_str());
619 self.clocks.remove(&component_id);
620
621 if let Some((order_hid, position_hid)) = self.strategy_handler_ids.get(strategy_id) {
623 let order_topic = get_event_orders_topic(*strategy_id);
624 let position_topic = get_event_positions_topic(*strategy_id);
625 msgbus::remove_order_event_handler(order_topic.into(), *order_hid);
626 msgbus::remove_position_event_handler(position_topic.into(), *position_hid);
627 }
628 }
629
630 self.strategy_ids.clear();
631 self.strategy_stop_fns.clear();
632 self.strategy_handler_ids.clear();
633
634 Ok(())
635 }
636
637 pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
643 for exec_algorithm_id in &self.exec_algorithm_ids {
644 log::debug!("Disposing execution algorithm {exec_algorithm_id}");
645 dispose_component(&exec_algorithm_id.inner())?;
646 let component_id = ComponentId::new(exec_algorithm_id.inner().as_str());
647 self.clocks.remove(&component_id);
648 }
649
650 self.exec_algorithm_ids.clear();
651
652 Ok(())
653 }
654
655 pub fn initialize(&mut self) -> anyhow::Result<()> {
663 let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
664 self.state = new_state;
665
666 Ok(())
667 }
668
669 fn on_start(&mut self) -> anyhow::Result<()> {
670 self.start_components()?;
671
672 self.ts_started = Some(self.clock.borrow().timestamp_ns());
674
675 Ok(())
676 }
677
678 fn on_stop(&mut self) -> anyhow::Result<()> {
679 self.stop_components()?;
680
681 self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
682
683 Ok(())
684 }
685
686 fn on_reset(&mut self) -> anyhow::Result<()> {
687 self.reset_components()?;
688
689 self.ts_started = None;
690 self.ts_stopped = None;
691
692 Ok(())
693 }
694
695 fn on_dispose(&mut self) -> anyhow::Result<()> {
696 if self.is_running() {
697 self.stop()?;
698 }
699
700 self.dispose_components()?;
701
702 Ok(())
703 }
704}
705
706impl Component for Trader {
707 fn component_id(&self) -> ComponentId {
708 ComponentId::new(format!("Trader-{}", self.trader_id))
709 }
710
711 fn state(&self) -> ComponentState {
712 self.state
713 }
714
715 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
716 self.state = self.state.transition(&trigger)?;
717 log::info!("{}", self.state.variant_name());
718 Ok(())
719 }
720
721 fn register(
722 &mut self,
723 _trader_id: TraderId,
724 _clock: Rc<RefCell<dyn Clock>>,
725 _cache: Rc<RefCell<Cache>>,
726 ) -> anyhow::Result<()> {
727 anyhow::bail!("Trader cannot register with itself")
728 }
729
730 fn on_start(&mut self) -> anyhow::Result<()> {
731 Self::on_start(self)
732 }
733
734 fn on_stop(&mut self) -> anyhow::Result<()> {
735 Self::on_stop(self)
736 }
737
738 fn on_reset(&mut self) -> anyhow::Result<()> {
739 Self::on_reset(self)
740 }
741
742 fn on_dispose(&mut self) -> anyhow::Result<()> {
743 Self::on_dispose(self)
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use std::{
750 cell::RefCell,
751 ops::{Deref, DerefMut},
752 rc::Rc,
753 };
754
755 use nautilus_common::{
756 actor::{DataActorCore, data_actor::DataActorConfig},
757 cache::Cache,
758 clock::TestClock,
759 enums::{ComponentState, Environment},
760 msgbus,
761 msgbus::{MessageBus, TypedHandler, switchboard::get_event_orders_topic},
762 };
763 use nautilus_core::UUID4;
764 use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
765 use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
766 use nautilus_model::{
767 events::OrderAccepted,
768 identifiers::{ActorId, ComponentId, TraderId},
769 stubs::TestDefault,
770 };
771 use nautilus_portfolio::portfolio::Portfolio;
772 use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
773 use nautilus_trading::strategy::{
774 Strategy as StrategyTrait, config::StrategyConfig, core::StrategyCore,
775 };
776 use rstest::rstest;
777
778 use super::*;
779
780 #[derive(Debug)]
782 struct TestDataActor {
783 core: DataActorCore,
784 }
785
786 impl TestDataActor {
787 fn new(config: DataActorConfig) -> Self {
788 Self {
789 core: DataActorCore::new(config),
790 }
791 }
792 }
793
794 impl DataActor for TestDataActor {}
795
796 impl Deref for TestDataActor {
797 type Target = DataActorCore;
798 fn deref(&self) -> &Self::Target {
799 &self.core
800 }
801 }
802
803 impl DerefMut for TestDataActor {
804 fn deref_mut(&mut self) -> &mut Self::Target {
805 &mut self.core
806 }
807 }
808
809 #[derive(Debug)]
811 struct TestStrategy {
812 core: StrategyCore,
813 }
814
815 impl TestStrategy {
816 fn new(config: StrategyConfig) -> Self {
817 Self {
818 core: StrategyCore::new(config),
819 }
820 }
821 }
822
823 impl DataActor for TestStrategy {}
824
825 impl Deref for TestStrategy {
826 type Target = DataActorCore;
827 fn deref(&self) -> &Self::Target {
828 &self.core
829 }
830 }
831
832 impl DerefMut for TestStrategy {
833 fn deref_mut(&mut self) -> &mut Self::Target {
834 &mut self.core
835 }
836 }
837
838 impl StrategyTrait for TestStrategy {
839 fn core(&self) -> &StrategyCore {
840 &self.core
841 }
842
843 fn core_mut(&mut self) -> &mut StrategyCore {
844 &mut self.core
845 }
846 }
847
848 #[allow(clippy::type_complexity)]
849 fn create_trader_components() -> (
850 Rc<RefCell<MessageBus>>,
851 Rc<RefCell<Cache>>,
852 Rc<RefCell<Portfolio>>,
853 Rc<RefCell<DataEngine>>,
854 Rc<RefCell<RiskEngine>>,
855 Rc<RefCell<ExecutionEngine>>,
856 Rc<RefCell<TestClock>>,
857 ) {
858 let trader_id = TraderId::test_default();
859 let instance_id = UUID4::new();
860 let clock = Rc::new(RefCell::new(TestClock::new()));
861 clock.borrow_mut().set_time(1_000_000_000u64.into());
863 let msgbus = Rc::new(RefCell::new(MessageBus::new(
864 trader_id,
865 instance_id,
866 Some("test".to_string()),
867 None,
868 )));
869 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
870 let portfolio = Rc::new(RefCell::new(Portfolio::new(
871 cache.clone(),
872 clock.clone() as Rc<RefCell<dyn Clock>>,
873 None,
874 )));
875 let data_engine = Rc::new(RefCell::new(DataEngine::new(
876 clock.clone(),
877 cache.clone(),
878 Some(DataEngineConfig::default()),
879 )));
880
881 let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
883 let risk_clock = Rc::new(RefCell::new(TestClock::new()));
884 let risk_portfolio = Portfolio::new(
885 risk_cache.clone(),
886 risk_clock.clone() as Rc<RefCell<dyn Clock>>,
887 None,
888 );
889 let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
890 RiskEngineConfig::default(),
891 risk_portfolio,
892 risk_clock as Rc<RefCell<dyn Clock>>,
893 risk_cache,
894 )));
895 let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
896 clock.clone(),
897 cache.clone(),
898 Some(ExecutionEngineConfig::default()),
899 )));
900
901 (
902 msgbus,
903 cache,
904 portfolio,
905 data_engine,
906 risk_engine,
907 exec_engine,
908 clock,
909 )
910 }
911
912 #[rstest]
913 fn test_trader_creation() {
914 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
915 create_trader_components();
916 let trader_id = TraderId::test_default();
917 let instance_id = UUID4::new();
918
919 let trader = Trader::new(
920 trader_id,
921 instance_id,
922 Environment::Backtest,
923 clock,
924 cache,
925 portfolio,
926 );
927
928 assert_eq!(trader.trader_id(), trader_id);
929 assert_eq!(trader.instance_id(), instance_id);
930 assert_eq!(trader.environment(), Environment::Backtest);
931 assert_eq!(trader.state(), ComponentState::PreInitialized);
932 assert_eq!(trader.actor_count(), 0);
933 assert_eq!(trader.strategy_count(), 0);
934 assert_eq!(trader.exec_algorithm_count(), 0);
935 assert_eq!(trader.component_count(), 0);
936 assert!(!trader.is_running());
937 assert!(!trader.is_stopped());
938 assert!(!trader.is_disposed());
939 assert!(trader.ts_created() > 0);
940 assert!(trader.ts_started().is_none());
941 assert!(trader.ts_stopped().is_none());
942 }
943
944 #[rstest]
945 fn test_trader_component_id() {
946 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
947 create_trader_components();
948 let trader_id = TraderId::from("TRADER-001");
949 let instance_id = UUID4::new();
950
951 let trader = Trader::new(
952 trader_id,
953 instance_id,
954 Environment::Backtest,
955 clock,
956 cache,
957 portfolio,
958 );
959
960 assert_eq!(
961 trader.component_id(),
962 ComponentId::from("Trader-TRADER-001")
963 );
964 }
965
966 #[rstest]
967 fn test_add_actor_success() {
968 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
969 create_trader_components();
970 let trader_id = TraderId::test_default();
971 let instance_id = UUID4::new();
972
973 let mut trader = Trader::new(
974 trader_id,
975 instance_id,
976 Environment::Backtest,
977 clock,
978 cache,
979 portfolio,
980 );
981
982 let actor = TestDataActor::new(DataActorConfig::default());
983 let actor_id = actor.actor_id();
984
985 let result = trader.add_actor(actor);
986 assert!(result.is_ok());
987 assert_eq!(trader.actor_count(), 1);
988 assert_eq!(trader.component_count(), 1);
989 assert!(trader.actor_ids().contains(&actor_id));
990 }
991
992 #[rstest]
993 fn test_add_duplicate_actor_fails() {
994 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
995 create_trader_components();
996 let trader_id = TraderId::test_default();
997 let instance_id = UUID4::new();
998
999 let mut trader = Trader::new(
1000 trader_id,
1001 instance_id,
1002 Environment::Backtest,
1003 clock,
1004 cache,
1005 portfolio,
1006 );
1007
1008 let config = DataActorConfig {
1009 actor_id: Some(ActorId::from("TestActor")),
1010 ..Default::default()
1011 };
1012 let actor1 = TestDataActor::new(config.clone());
1013 let actor2 = TestDataActor::new(config);
1014
1015 assert!(trader.add_actor(actor1).is_ok());
1017 assert_eq!(trader.actor_count(), 1);
1018
1019 let result = trader.add_actor(actor2);
1021 assert!(result.is_err());
1022 assert!(
1023 result
1024 .unwrap_err()
1025 .to_string()
1026 .contains("already registered")
1027 );
1028 assert_eq!(trader.actor_count(), 1);
1029 }
1030
1031 #[rstest]
1032 fn test_add_strategy_success() {
1033 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1034 create_trader_components();
1035 let trader_id = TraderId::test_default();
1036 let instance_id = UUID4::new();
1037
1038 let mut trader = Trader::new(
1039 trader_id,
1040 instance_id,
1041 Environment::Backtest,
1042 clock,
1043 cache,
1044 portfolio,
1045 );
1046
1047 let config = StrategyConfig {
1048 strategy_id: Some(StrategyId::from("Test-Strategy")),
1049 ..Default::default()
1050 };
1051 let strategy = TestStrategy::new(config);
1052 let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1053
1054 let result = trader.add_strategy(strategy);
1055 assert!(result.is_ok());
1056 assert_eq!(trader.strategy_count(), 1);
1057 assert_eq!(trader.component_count(), 1);
1058 assert!(trader.strategy_ids().contains(&strategy_id));
1059 }
1060
1061 #[rstest]
1062 fn test_add_exec_algorithm_success() {
1063 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1064 create_trader_components();
1065 let trader_id = TraderId::test_default();
1066 let instance_id = UUID4::new();
1067
1068 let mut trader = Trader::new(
1069 trader_id,
1070 instance_id,
1071 Environment::Backtest,
1072 clock,
1073 cache,
1074 portfolio,
1075 );
1076
1077 let config = DataActorConfig {
1078 actor_id: Some(ActorId::from("TestExecAlgorithm")),
1079 ..Default::default()
1080 };
1081 let exec_algorithm = TestDataActor::new(config);
1082 let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
1083
1084 let result = trader.add_exec_algorithm(exec_algorithm);
1085 assert!(result.is_ok());
1086 assert_eq!(trader.exec_algorithm_count(), 1);
1087 assert_eq!(trader.component_count(), 1);
1088 assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
1089 }
1090
1091 #[rstest]
1092 fn test_component_lifecycle() {
1093 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1094 create_trader_components();
1095 let trader_id = TraderId::test_default();
1096 let instance_id = UUID4::new();
1097
1098 let mut trader = Trader::new(
1099 trader_id,
1100 instance_id,
1101 Environment::Backtest,
1102 clock,
1103 cache,
1104 portfolio,
1105 );
1106
1107 let actor = TestDataActor::new(DataActorConfig::default());
1109
1110 let strategy_config = StrategyConfig {
1111 strategy_id: Some(StrategyId::from("Test-Strategy")),
1112 ..Default::default()
1113 };
1114 let strategy = TestStrategy::new(strategy_config);
1115
1116 let exec_algorithm_config = DataActorConfig {
1117 actor_id: Some(ActorId::from("TestExecAlgorithm")),
1118 ..Default::default()
1119 };
1120 let exec_algorithm = TestDataActor::new(exec_algorithm_config);
1121
1122 assert!(trader.add_actor(actor).is_ok());
1123 assert!(trader.add_strategy(strategy).is_ok());
1124 assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1125 assert_eq!(trader.component_count(), 3);
1126
1127 let start_result = trader.start_components();
1129 assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1130
1131 assert!(trader.stop_components().is_ok());
1133
1134 assert!(trader.reset_components().is_ok());
1136
1137 assert!(trader.dispose_components().is_ok());
1139 assert_eq!(trader.component_count(), 0);
1140 }
1141
1142 #[rstest]
1143 fn test_trader_component_lifecycle() {
1144 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1145 create_trader_components();
1146 let trader_id = TraderId::test_default();
1147 let instance_id = UUID4::new();
1148
1149 let mut trader = Trader::new(
1150 trader_id,
1151 instance_id,
1152 Environment::Backtest,
1153 clock,
1154 cache,
1155 portfolio,
1156 );
1157
1158 assert_eq!(trader.state(), ComponentState::PreInitialized);
1160 assert!(!trader.is_running());
1161 assert!(!trader.is_stopped());
1162 assert!(!trader.is_disposed());
1163
1164 assert!(trader.start().is_err());
1166
1167 trader.initialize().unwrap();
1169
1170 assert!(trader.start().is_ok());
1172 assert_eq!(trader.state(), ComponentState::Running);
1173 assert!(trader.is_running());
1174 assert!(trader.ts_started().is_some());
1175
1176 assert!(trader.stop().is_ok());
1178 assert_eq!(trader.state(), ComponentState::Stopped);
1179 assert!(trader.is_stopped());
1180 assert!(trader.ts_stopped().is_some());
1181
1182 assert!(trader.reset().is_ok());
1184 assert_eq!(trader.state(), ComponentState::Ready);
1185 assert!(trader.ts_started().is_none());
1186 assert!(trader.ts_stopped().is_none());
1187
1188 assert!(trader.dispose().is_ok());
1190 assert_eq!(trader.state(), ComponentState::Disposed);
1191 assert!(trader.is_disposed());
1192 }
1193
1194 #[rstest]
1195 fn test_cannot_add_components_while_running() {
1196 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1197 create_trader_components();
1198 let trader_id = TraderId::test_default();
1199 let instance_id = UUID4::new();
1200
1201 let mut trader = Trader::new(
1202 trader_id,
1203 instance_id,
1204 Environment::Backtest,
1205 clock,
1206 cache,
1207 portfolio,
1208 );
1209
1210 trader.state = ComponentState::Running;
1212
1213 let actor = TestDataActor::new(DataActorConfig::default());
1214 let result = trader.add_actor(actor);
1215 assert!(result.is_err());
1216 assert!(
1217 result
1218 .unwrap_err()
1219 .to_string()
1220 .contains("while trader is running")
1221 );
1222 }
1223
1224 #[rstest]
1225 fn test_create_component_clock_backtest_vs_live() {
1226 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1227 create_trader_components();
1228 let trader_id = TraderId::test_default();
1229 let instance_id = UUID4::new();
1230
1231 let trader_backtest = Trader::new(
1233 trader_id,
1234 instance_id,
1235 Environment::Backtest,
1236 clock.clone(),
1237 cache.clone(),
1238 portfolio.clone(),
1239 );
1240
1241 let backtest_clock = trader_backtest.create_component_clock();
1242 assert_ne!(
1244 backtest_clock.as_ptr() as *const _,
1245 clock.as_ptr() as *const _
1246 );
1247
1248 let trader_live = Trader::new(
1250 trader_id,
1251 instance_id,
1252 Environment::Live,
1253 clock.clone(),
1254 cache,
1255 portfolio,
1256 );
1257
1258 let live_clock = trader_live.create_component_clock();
1259 assert_eq!(live_clock.as_ptr() as *const _, clock.as_ptr() as *const _);
1261 }
1262
1263 #[rstest]
1264 fn test_clear_strategies_preserves_other_handlers() {
1265 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1266 create_trader_components();
1267 let trader_id = TraderId::test_default();
1268 let instance_id = UUID4::new();
1269
1270 let mut trader = Trader::new(
1271 trader_id,
1272 instance_id,
1273 Environment::Backtest,
1274 clock,
1275 cache,
1276 portfolio,
1277 );
1278
1279 let config = StrategyConfig {
1280 strategy_id: Some(StrategyId::from("Test-Strategy")),
1281 ..Default::default()
1282 };
1283 let strategy = TestStrategy::new(config);
1284 let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1285 trader.add_strategy(strategy).unwrap();
1286
1287 let ext_received = Rc::new(RefCell::new(0));
1289 let ext_clone = ext_received.clone();
1290 let ext_handler =
1291 TypedHandler::from_with_id("exec-algo-handler", move |_: &OrderEventAny| {
1292 *ext_clone.borrow_mut() += 1;
1293 });
1294 let order_topic = get_event_orders_topic(strategy_id);
1295 msgbus::subscribe_order_events(order_topic.into(), ext_handler, None);
1296
1297 trader.clear_strategies().unwrap();
1298 assert_eq!(trader.strategy_count(), 0);
1299
1300 let event = OrderEventAny::Accepted(OrderAccepted::test_default());
1301 msgbus::publish_order_event(order_topic, &event);
1302 assert_eq!(*ext_received.borrow(), 1);
1303 }
1304}