nautilus_system/
trader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Central orchestrator for managing actors, strategies, and execution algorithms.
17//!
18//! The `Trader` component serves as the primary coordination layer between the kernel
19//! and individual trading components. It manages component lifecycles, provides
20//! unique identification, and coordinates with system engines.
21
22use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
23
24use nautilus_common::{
25    actor::{DataActor, registry::try_get_actor_unchecked},
26    cache::Cache,
27    clock::{Clock, TestClock},
28    component::{
29        Component, dispose_component, register_component_actor, reset_component, start_component,
30        stop_component,
31    },
32    enums::{ComponentState, ComponentTrigger, Environment},
33    msgbus,
34    msgbus::{
35        handler::{ShareableMessageHandler, TypedMessageHandler},
36        switchboard::{get_event_orders_topic, get_event_positions_topic},
37    },
38    timer::{TimeEvent, TimeEventCallback},
39};
40use nautilus_core::{UUID4, UnixNanos};
41use nautilus_model::{
42    events::{OrderEventAny, PositionEvent},
43    identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
44};
45use nautilus_portfolio::portfolio::Portfolio;
46use nautilus_trading::strategy::Strategy;
47
48/// Central orchestrator for managing trading components.
49///
50/// The `Trader` manages the lifecycle and coordination of actors, strategies,
51/// and execution algorithms within the trading system. It provides component
52/// registration, state management, and integration with system engines.
53pub struct Trader {
54    /// The unique trader identifier.
55    pub trader_id: TraderId,
56    /// The unique instance identifier.
57    pub instance_id: UUID4,
58    /// The trading environment context.
59    pub environment: Environment,
60    /// Component state for lifecycle management.
61    state: ComponentState,
62    /// System clock for timestamping.
63    clock: Rc<RefCell<dyn Clock>>,
64    /// System cache for data storage.
65    cache: Rc<RefCell<Cache>>,
66    /// Portfolio reference for strategy registration.
67    portfolio: Rc<RefCell<Portfolio>>,
68    /// Registered actor IDs (actors stored in global registry).
69    actor_ids: Vec<ActorId>,
70    /// Registered strategy IDs (strategies stored in global registry).
71    strategy_ids: Vec<StrategyId>,
72    /// Registered exec algorithm IDs (algorithms stored in global registry).
73    exec_algorithm_ids: Vec<ExecAlgorithmId>,
74    /// Component clocks for individual components.
75    clocks: HashMap<ComponentId, Rc<RefCell<dyn Clock>>>, // TODO: TBD global clock?
76    /// Timestamp when the trader was created.
77    ts_created: UnixNanos,
78    /// Timestamp when the trader was last started.
79    ts_started: Option<UnixNanos>,
80    /// Timestamp when the trader was last stopped.
81    ts_stopped: Option<UnixNanos>,
82}
83
84impl Debug for Trader {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        write!(f, "{:?}", stringify!(TraderId)) // TODO
87    }
88}
89
90impl Trader {
91    /// Creates a new [`Trader`] instance.
92    #[must_use]
93    pub fn new(
94        trader_id: TraderId,
95        instance_id: UUID4,
96        environment: Environment,
97        clock: Rc<RefCell<dyn Clock>>,
98        cache: Rc<RefCell<Cache>>,
99        portfolio: Rc<RefCell<Portfolio>>,
100    ) -> Self {
101        let ts_created = clock.borrow().timestamp_ns();
102
103        Self {
104            trader_id,
105            instance_id,
106            environment,
107            state: ComponentState::PreInitialized,
108            clock,
109            cache,
110            portfolio,
111            actor_ids: Vec::new(),
112            strategy_ids: Vec::new(),
113            exec_algorithm_ids: Vec::new(),
114            clocks: HashMap::new(),
115            ts_created,
116            ts_started: None,
117            ts_stopped: None,
118        }
119    }
120
121    /// Returns the trader ID.
122    #[must_use]
123    pub const fn trader_id(&self) -> TraderId {
124        self.trader_id
125    }
126
127    /// Returns the instance ID.
128    #[must_use]
129    pub const fn instance_id(&self) -> UUID4 {
130        self.instance_id
131    }
132
133    /// Returns the trading environment.
134    #[must_use]
135    pub const fn environment(&self) -> Environment {
136        self.environment
137    }
138
139    /// Returns the current component state.
140    #[must_use]
141    pub const fn state(&self) -> ComponentState {
142        self.state
143    }
144
145    /// Returns the timestamp when the trader was created (UNIX nanoseconds).
146    #[must_use]
147    pub const fn ts_created(&self) -> UnixNanos {
148        self.ts_created
149    }
150
151    /// Returns the timestamp when the trader was last started (UNIX nanoseconds).
152    #[must_use]
153    pub const fn ts_started(&self) -> Option<UnixNanos> {
154        self.ts_started
155    }
156
157    /// Returns the timestamp when the trader was last stopped (UNIX nanoseconds).
158    #[must_use]
159    pub const fn ts_stopped(&self) -> Option<UnixNanos> {
160        self.ts_stopped
161    }
162
163    /// Returns the number of registered actors.
164    #[must_use]
165    pub const fn actor_count(&self) -> usize {
166        self.actor_ids.len()
167    }
168
169    /// Returns the number of registered strategies.
170    #[must_use]
171    pub const fn strategy_count(&self) -> usize {
172        self.strategy_ids.len()
173    }
174
175    /// Returns the number of registered execution algorithms.
176    #[must_use]
177    pub const fn exec_algorithm_count(&self) -> usize {
178        self.exec_algorithm_ids.len()
179    }
180
181    /// Returns the total number of registered components.
182    #[must_use]
183    pub const fn component_count(&self) -> usize {
184        self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
185    }
186
187    /// Returns a list of all registered actor IDs.
188    #[must_use]
189    pub fn actor_ids(&self) -> Vec<ActorId> {
190        self.actor_ids.clone()
191    }
192
193    /// Returns a list of all registered strategy IDs.
194    #[must_use]
195    pub fn strategy_ids(&self) -> Vec<StrategyId> {
196        self.strategy_ids.clone()
197    }
198
199    /// Returns a list of all registered execution algorithm IDs.
200    #[must_use]
201    pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
202        self.exec_algorithm_ids.clone()
203    }
204
205    /// Creates a clock for a component.
206    ///
207    /// Creates a test clock in backtest environment, otherwise returns a reference
208    /// to the system clock.
209    fn create_component_clock(&self) -> Rc<RefCell<dyn Clock>> {
210        match self.environment {
211            Environment::Backtest => {
212                // Create individual test clock for component in backtest
213                Rc::new(RefCell::new(TestClock::new()))
214            }
215            Environment::Live | Environment::Sandbox => {
216                // Share system clock in live environments
217                self.clock.clone()
218            }
219        }
220    }
221
222    /// Adds an actor to the trader.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if:
227    /// - The trader is not in a valid state for adding components.
228    /// - An actor with the same ID is already registered.
229    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
230    where
231        T: DataActor + Component + Debug + 'static,
232    {
233        self.validate_component_registration()?;
234
235        let actor_id = actor.actor_id();
236
237        // Check for duplicate registration
238        if self.actor_ids.contains(&actor_id) {
239            anyhow::bail!("Actor '{actor_id}' is already registered");
240        }
241
242        let clock = self.create_component_clock();
243        let component_id = ComponentId::new(actor_id.inner().as_str());
244        self.clocks.insert(component_id, clock.clone());
245
246        let mut actor_mut = actor;
247        actor_mut.register(self.trader_id, clock, self.cache.clone())?;
248
249        self.add_registered_actor(actor_mut)
250    }
251
252    /// Adds an actor to the trader using a factory function.
253    ///
254    /// The factory function is called at registration time to create the actor,
255    /// avoiding cloning issues with non-cloneable actor types.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if:
260    /// - The factory function fails to create the actor.
261    /// - The trader is not in a valid state for adding components.
262    /// - An actor with the same ID is already registered.
263    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
264    where
265        F: FnOnce() -> anyhow::Result<T>,
266        T: DataActor + Component + Debug + 'static,
267    {
268        let actor = factory()?;
269
270        self.add_actor(actor)
271    }
272
273    /// Adds an already registered actor to the trader's component registry.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the actor cannot be registered in the component registry.
278    pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
279    where
280        T: DataActor + Component + Debug + 'static,
281    {
282        let actor_id = actor.actor_id();
283        let mem_addr = actor.mem_address();
284
285        // Register in both component and actor registries (this consumes the actor)
286        register_component_actor(actor);
287
288        // Store actor ID for lifecycle management
289        self.actor_ids.push(actor_id);
290
291        log::info!(
292            "Registered '{actor_id}' at mem_addr {mem_addr} with trader {}",
293            self.trader_id
294        );
295
296        Ok(())
297    }
298
299    /// Adds an actor ID to the trader's lifecycle management without consuming the actor.
300    ///
301    /// This is useful when the actor is already registered in the global component registry
302    /// but the trader needs to track it for lifecycle management. The caller is responsible
303    /// for ensuring the actor is properly registered in the global registries.
304    ///
305    /// # Errors
306    ///
307    /// Returns an error if the actor ID is already tracked by this trader.
308    pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
309        // Check for duplicate registration
310        if self.actor_ids.contains(&actor_id) {
311            anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
312        }
313
314        // Store actor ID for lifecycle management
315        self.actor_ids.push(actor_id);
316
317        log::debug!(
318            "Added actor ID '{actor_id}' to trader {} for lifecycle management",
319            self.trader_id
320        );
321
322        Ok(())
323    }
324
325    /// Adds a strategy to the trader.
326    ///
327    /// Strategies are registered in both the component registry (for lifecycle management)
328    /// and the actor registry (for data callbacks via msgbus). The strategy's `StrategyCore`
329    /// is also registered with the portfolio for order management.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if:
334    /// - The trader is not in a valid state for adding components.
335    /// - A strategy with the same ID is already registered.
336    pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
337    where
338        T: Strategy + Component + Debug + 'static,
339    {
340        self.validate_component_registration()?;
341
342        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
343
344        // Check for duplicate registration
345        if self.strategy_ids.contains(&strategy_id) {
346            anyhow::bail!("Strategy '{strategy_id}' is already registered");
347        }
348
349        let clock = self.create_component_clock();
350        let component_id = strategy.component_id();
351        self.clocks.insert(component_id, clock.clone());
352
353        // Register strategy core with portfolio for order management
354        strategy.core_mut().register(
355            self.trader_id,
356            clock.clone(),
357            self.cache.clone(),
358            self.portfolio.clone(),
359        )?;
360
361        // Register default time event handler for this strategy
362        let actor_id = strategy.actor_id().inner();
363        let callback = TimeEventCallback::from(move |event: TimeEvent| {
364            if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
365                actor.handle_time_event(&event);
366            } else {
367                log::error!("Strategy {actor_id} not found for time event handling");
368            }
369        });
370        clock.borrow_mut().register_default_handler(callback);
371
372        // Transition to Ready state
373        strategy.initialize()?;
374
375        // Register in both component and actor registries
376        register_component_actor(strategy);
377
378        let order_topic = get_event_orders_topic(strategy_id);
379        let order_actor_id = actor_id;
380        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
381            move |event: &OrderEventAny| {
382                if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
383                    strategy.handle_order_event(event.clone());
384                } else {
385                    log::error!("Strategy {order_actor_id} not found for order event handling");
386                }
387            },
388        )));
389        msgbus::subscribe_topic(order_topic, handler, None);
390
391        let position_topic = get_event_positions_topic(strategy_id);
392        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
393            move |event: &PositionEvent| {
394                if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
395                    strategy.handle_position_event(event.clone());
396                } else {
397                    log::error!("Strategy {actor_id} not found for position event handling");
398                }
399            },
400        )));
401        msgbus::subscribe_topic(position_topic, handler, None);
402
403        self.strategy_ids.push(strategy_id);
404        log::info!(
405            "Registered strategy '{strategy_id}' with trader {}",
406            self.trader_id
407        );
408
409        Ok(())
410    }
411
412    /// Adds an execution algorithm to the trader.
413    ///
414    /// Execution algorithms are registered in both the component registry (for lifecycle
415    /// management) and the actor registry (for data callbacks via msgbus).
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if:
420    /// - The trader is not in a valid state for adding components.
421    /// - An execution algorithm with the same ID is already registered.
422    pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
423    where
424        T: DataActor + Component + Debug + 'static,
425    {
426        self.validate_component_registration()?;
427
428        let exec_algorithm_id =
429            ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
430
431        // Check for duplicate registration
432        if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
433            anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
434        }
435
436        let clock = self.create_component_clock();
437        let component_id = exec_algorithm.component_id();
438        self.clocks.insert(component_id, clock.clone());
439
440        exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
441
442        // Register in both component and actor registries
443        register_component_actor(exec_algorithm);
444
445        self.exec_algorithm_ids.push(exec_algorithm_id);
446        log::info!(
447            "Registered execution algorithm '{exec_algorithm_id}' with trader {}",
448            self.trader_id
449        );
450
451        Ok(())
452    }
453
454    /// Validates that the trader is in a valid state for component registration.
455    fn validate_component_registration(&self) -> anyhow::Result<()> {
456        match self.state {
457            ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
458                Ok(())
459            }
460            ComponentState::Running => {
461                anyhow::bail!("Cannot add components while trader is running")
462            }
463            ComponentState::Disposed => {
464                anyhow::bail!("Cannot add components to disposed trader")
465            }
466            _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
467        }
468    }
469
470    /// Starts all registered components.
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if any component fails to start.
475    pub fn start_components(&mut self) -> anyhow::Result<()> {
476        for actor_id in &self.actor_ids {
477            log::debug!("Starting actor '{actor_id}'");
478            start_component(&actor_id.inner())?;
479        }
480
481        for strategy_id in &self.strategy_ids {
482            log::debug!("Starting strategy '{strategy_id}'");
483            start_component(&strategy_id.inner())?;
484        }
485
486        for exec_algorithm_id in &self.exec_algorithm_ids {
487            log::debug!("Starting execution algorithm '{exec_algorithm_id}'");
488            start_component(&exec_algorithm_id.inner())?;
489        }
490
491        Ok(())
492    }
493
494    /// Stops all registered components.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if any component fails to stop.
499    pub fn stop_components(&mut self) -> anyhow::Result<()> {
500        for actor_id in &self.actor_ids {
501            log::debug!("Stopping actor '{actor_id}'");
502            stop_component(&actor_id.inner())?;
503        }
504
505        for exec_algorithm_id in &self.exec_algorithm_ids {
506            log::debug!("Stopping execution algorithm '{exec_algorithm_id}'");
507            stop_component(&exec_algorithm_id.inner())?;
508        }
509
510        for strategy_id in &self.strategy_ids {
511            log::debug!("Stopping strategy '{strategy_id}'");
512            stop_component(&strategy_id.inner())?;
513        }
514
515        Ok(())
516    }
517
518    /// Resets all registered components.
519    ///
520    /// # Errors
521    ///
522    /// Returns an error if any component fails to reset.
523    pub fn reset_components(&mut self) -> anyhow::Result<()> {
524        for actor_id in &self.actor_ids {
525            log::debug!("Resetting actor '{actor_id}'");
526            reset_component(&actor_id.inner())?;
527        }
528
529        for strategy_id in &self.strategy_ids {
530            log::debug!("Resetting strategy '{strategy_id}'");
531            reset_component(&strategy_id.inner())?;
532        }
533
534        for exec_algorithm_id in &self.exec_algorithm_ids {
535            log::debug!("Resetting execution algorithm '{exec_algorithm_id}'");
536            reset_component(&exec_algorithm_id.inner())?;
537        }
538
539        Ok(())
540    }
541
542    /// Disposes of all registered components.
543    ///
544    /// # Errors
545    ///
546    /// Returns an error if any component fails to dispose.
547    pub fn dispose_components(&mut self) -> anyhow::Result<()> {
548        for actor_id in &self.actor_ids {
549            log::debug!("Disposing actor '{actor_id}'");
550            dispose_component(&actor_id.inner())?;
551        }
552
553        for strategy_id in &self.strategy_ids {
554            log::debug!("Disposing strategy '{strategy_id}'");
555            dispose_component(&strategy_id.inner())?;
556        }
557
558        for exec_algorithm_id in &self.exec_algorithm_ids {
559            log::debug!("Disposing execution algorithm '{exec_algorithm_id}'");
560            dispose_component(&exec_algorithm_id.inner())?;
561        }
562
563        self.actor_ids.clear();
564        self.strategy_ids.clear();
565        self.exec_algorithm_ids.clear();
566        self.clocks.clear();
567
568        Ok(())
569    }
570
571    /// Initializes the trader, transitioning from `PreInitialized` to `Ready` state.
572    ///
573    /// This method must be called before starting the trader.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if the trader cannot be initialized from its current state.
578    pub fn initialize(&mut self) -> anyhow::Result<()> {
579        let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
580        self.state = new_state;
581
582        Ok(())
583    }
584
585    fn on_start(&mut self) -> anyhow::Result<()> {
586        self.start_components()?;
587
588        // Transition to running state
589        self.ts_started = Some(self.clock.borrow().timestamp_ns());
590
591        Ok(())
592    }
593
594    fn on_stop(&mut self) -> anyhow::Result<()> {
595        self.stop_components()?;
596
597        self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
598
599        Ok(())
600    }
601
602    fn on_reset(&mut self) -> anyhow::Result<()> {
603        self.reset_components()?;
604
605        self.ts_started = None;
606        self.ts_stopped = None;
607
608        Ok(())
609    }
610
611    fn on_dispose(&mut self) -> anyhow::Result<()> {
612        if self.is_running() {
613            self.stop()?;
614        }
615
616        self.dispose_components()?;
617
618        Ok(())
619    }
620}
621
622impl Component for Trader {
623    fn component_id(&self) -> ComponentId {
624        ComponentId::new(format!("Trader-{}", self.trader_id))
625    }
626
627    fn state(&self) -> ComponentState {
628        self.state
629    }
630
631    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
632        self.state = self.state.transition(&trigger)?;
633        log::info!("{}", self.state.variant_name());
634        Ok(())
635    }
636
637    fn register(
638        &mut self,
639        _trader_id: TraderId,
640        _clock: Rc<RefCell<dyn Clock>>,
641        _cache: Rc<RefCell<Cache>>,
642    ) -> anyhow::Result<()> {
643        anyhow::bail!("Trader cannot register with itself")
644    }
645
646    fn on_start(&mut self) -> anyhow::Result<()> {
647        Self::on_start(self)
648    }
649
650    fn on_stop(&mut self) -> anyhow::Result<()> {
651        Self::on_stop(self)
652    }
653
654    fn on_reset(&mut self) -> anyhow::Result<()> {
655        Self::on_reset(self)
656    }
657
658    fn on_dispose(&mut self) -> anyhow::Result<()> {
659        Self::on_dispose(self)
660    }
661}
662
663#[cfg(test)]
664mod tests {
665    use std::{
666        cell::RefCell,
667        ops::{Deref, DerefMut},
668        rc::Rc,
669    };
670
671    use nautilus_common::{
672        actor::{DataActorCore, data_actor::DataActorConfig},
673        cache::Cache,
674        clock::TestClock,
675        enums::{ComponentState, Environment},
676        msgbus::MessageBus,
677    };
678    use nautilus_core::UUID4;
679    use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
680    use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
681    use nautilus_model::identifiers::{ActorId, ComponentId, TraderId};
682    use nautilus_portfolio::portfolio::Portfolio;
683    use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
684    use nautilus_trading::strategy::{
685        Strategy as StrategyTrait, config::StrategyConfig, core::StrategyCore,
686    };
687    use rstest::rstest;
688
689    use super::*;
690
691    // Simple DataActor wrapper for testing
692    #[derive(Debug)]
693    struct TestDataActor {
694        core: DataActorCore,
695    }
696
697    impl TestDataActor {
698        fn new(config: DataActorConfig) -> Self {
699            Self {
700                core: DataActorCore::new(config),
701            }
702        }
703    }
704
705    impl DataActor for TestDataActor {}
706
707    impl Deref for TestDataActor {
708        type Target = DataActorCore;
709        fn deref(&self) -> &Self::Target {
710            &self.core
711        }
712    }
713
714    impl DerefMut for TestDataActor {
715        fn deref_mut(&mut self) -> &mut Self::Target {
716            &mut self.core
717        }
718    }
719
720    // Simple Strategy wrapper for testing
721    #[derive(Debug)]
722    struct TestStrategy {
723        core: StrategyCore,
724    }
725
726    impl TestStrategy {
727        fn new(config: StrategyConfig) -> Self {
728            Self {
729                core: StrategyCore::new(config),
730            }
731        }
732    }
733
734    impl DataActor for TestStrategy {}
735
736    // Deref through StrategyCore to DataActorCore
737    impl Deref for TestStrategy {
738        type Target = DataActorCore;
739        fn deref(&self) -> &Self::Target {
740            &self.core
741        }
742    }
743
744    impl DerefMut for TestStrategy {
745        fn deref_mut(&mut self) -> &mut Self::Target {
746            &mut self.core
747        }
748    }
749
750    impl StrategyTrait for TestStrategy {
751        fn core_mut(&mut self) -> &mut StrategyCore {
752            &mut self.core
753        }
754    }
755
756    #[allow(clippy::type_complexity)]
757    fn create_trader_components() -> (
758        Rc<RefCell<MessageBus>>,
759        Rc<RefCell<Cache>>,
760        Rc<RefCell<Portfolio>>,
761        Rc<RefCell<DataEngine>>,
762        Rc<RefCell<RiskEngine>>,
763        Rc<RefCell<ExecutionEngine>>,
764        Rc<RefCell<TestClock>>,
765    ) {
766        let trader_id = TraderId::default();
767        let instance_id = UUID4::new();
768        let clock = Rc::new(RefCell::new(TestClock::new()));
769        // Set the clock to a non-zero time for test purposes
770        clock.borrow_mut().set_time(1_000_000_000u64.into());
771        let msgbus = Rc::new(RefCell::new(MessageBus::new(
772            trader_id,
773            instance_id,
774            Some("test".to_string()),
775            None,
776        )));
777        let cache = Rc::new(RefCell::new(Cache::new(None, None)));
778        let portfolio = Rc::new(RefCell::new(Portfolio::new(
779            cache.clone(),
780            clock.clone() as Rc<RefCell<dyn Clock>>,
781            None,
782        )));
783        let data_engine = Rc::new(RefCell::new(DataEngine::new(
784            clock.clone(),
785            cache.clone(),
786            Some(DataEngineConfig::default()),
787        )));
788
789        // Create separate cache and clock instances for RiskEngine to avoid borrowing conflicts
790        let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
791        let risk_clock = Rc::new(RefCell::new(TestClock::new()));
792        let risk_portfolio = Portfolio::new(
793            risk_cache.clone(),
794            risk_clock.clone() as Rc<RefCell<dyn Clock>>,
795            None,
796        );
797        let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
798            RiskEngineConfig::default(),
799            risk_portfolio,
800            risk_clock as Rc<RefCell<dyn Clock>>,
801            risk_cache,
802        )));
803        let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
804            clock.clone(),
805            cache.clone(),
806            Some(ExecutionEngineConfig::default()),
807        )));
808
809        (
810            msgbus,
811            cache,
812            portfolio,
813            data_engine,
814            risk_engine,
815            exec_engine,
816            clock,
817        )
818    }
819
820    #[rstest]
821    fn test_trader_creation() {
822        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
823            create_trader_components();
824        let trader_id = TraderId::default();
825        let instance_id = UUID4::new();
826
827        let trader = Trader::new(
828            trader_id,
829            instance_id,
830            Environment::Backtest,
831            clock,
832            cache,
833            portfolio,
834        );
835
836        assert_eq!(trader.trader_id(), trader_id);
837        assert_eq!(trader.instance_id(), instance_id);
838        assert_eq!(trader.environment(), Environment::Backtest);
839        assert_eq!(trader.state(), ComponentState::PreInitialized);
840        assert_eq!(trader.actor_count(), 0);
841        assert_eq!(trader.strategy_count(), 0);
842        assert_eq!(trader.exec_algorithm_count(), 0);
843        assert_eq!(trader.component_count(), 0);
844        assert!(!trader.is_running());
845        assert!(!trader.is_stopped());
846        assert!(!trader.is_disposed());
847        assert!(trader.ts_created() > 0);
848        assert!(trader.ts_started().is_none());
849        assert!(trader.ts_stopped().is_none());
850    }
851
852    #[rstest]
853    fn test_trader_component_id() {
854        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
855            create_trader_components();
856        let trader_id = TraderId::from("TRADER-001");
857        let instance_id = UUID4::new();
858
859        let trader = Trader::new(
860            trader_id,
861            instance_id,
862            Environment::Backtest,
863            clock,
864            cache,
865            portfolio,
866        );
867
868        assert_eq!(
869            trader.component_id(),
870            ComponentId::from("Trader-TRADER-001")
871        );
872    }
873
874    #[rstest]
875    fn test_add_actor_success() {
876        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
877            create_trader_components();
878        let trader_id = TraderId::default();
879        let instance_id = UUID4::new();
880
881        let mut trader = Trader::new(
882            trader_id,
883            instance_id,
884            Environment::Backtest,
885            clock,
886            cache,
887            portfolio,
888        );
889
890        let actor = TestDataActor::new(DataActorConfig::default());
891        let actor_id = actor.actor_id();
892
893        let result = trader.add_actor(actor);
894        assert!(result.is_ok());
895        assert_eq!(trader.actor_count(), 1);
896        assert_eq!(trader.component_count(), 1);
897        assert!(trader.actor_ids().contains(&actor_id));
898    }
899
900    #[rstest]
901    fn test_add_duplicate_actor_fails() {
902        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
903            create_trader_components();
904        let trader_id = TraderId::default();
905        let instance_id = UUID4::new();
906
907        let mut trader = Trader::new(
908            trader_id,
909            instance_id,
910            Environment::Backtest,
911            clock,
912            cache,
913            portfolio,
914        );
915
916        let config = DataActorConfig {
917            actor_id: Some(ActorId::from("TestActor")),
918            ..Default::default()
919        };
920        let actor1 = TestDataActor::new(config.clone());
921        let actor2 = TestDataActor::new(config);
922
923        // First addition should succeed
924        assert!(trader.add_actor(actor1).is_ok());
925        assert_eq!(trader.actor_count(), 1);
926
927        // Second addition should fail
928        let result = trader.add_actor(actor2);
929        assert!(result.is_err());
930        assert!(
931            result
932                .unwrap_err()
933                .to_string()
934                .contains("already registered")
935        );
936        assert_eq!(trader.actor_count(), 1);
937    }
938
939    #[rstest]
940    fn test_add_strategy_success() {
941        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
942            create_trader_components();
943        let trader_id = TraderId::default();
944        let instance_id = UUID4::new();
945
946        let mut trader = Trader::new(
947            trader_id,
948            instance_id,
949            Environment::Backtest,
950            clock,
951            cache,
952            portfolio,
953        );
954
955        let config = StrategyConfig {
956            strategy_id: Some(StrategyId::from("Test-Strategy")),
957            ..Default::default()
958        };
959        let strategy = TestStrategy::new(config);
960        let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
961
962        let result = trader.add_strategy(strategy);
963        assert!(result.is_ok());
964        assert_eq!(trader.strategy_count(), 1);
965        assert_eq!(trader.component_count(), 1);
966        assert!(trader.strategy_ids().contains(&strategy_id));
967    }
968
969    #[rstest]
970    fn test_add_exec_algorithm_success() {
971        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
972            create_trader_components();
973        let trader_id = TraderId::default();
974        let instance_id = UUID4::new();
975
976        let mut trader = Trader::new(
977            trader_id,
978            instance_id,
979            Environment::Backtest,
980            clock,
981            cache,
982            portfolio,
983        );
984
985        let config = DataActorConfig {
986            actor_id: Some(ActorId::from("TestExecAlgorithm")),
987            ..Default::default()
988        };
989        let exec_algorithm = TestDataActor::new(config);
990        let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
991
992        let result = trader.add_exec_algorithm(exec_algorithm);
993        assert!(result.is_ok());
994        assert_eq!(trader.exec_algorithm_count(), 1);
995        assert_eq!(trader.component_count(), 1);
996        assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
997    }
998
999    #[rstest]
1000    fn test_component_lifecycle() {
1001        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1002            create_trader_components();
1003        let trader_id = TraderId::default();
1004        let instance_id = UUID4::new();
1005
1006        let mut trader = Trader::new(
1007            trader_id,
1008            instance_id,
1009            Environment::Backtest,
1010            clock,
1011            cache,
1012            portfolio,
1013        );
1014
1015        // Add components
1016        let actor = TestDataActor::new(DataActorConfig::default());
1017
1018        let strategy_config = StrategyConfig {
1019            strategy_id: Some(StrategyId::from("Test-Strategy")),
1020            ..Default::default()
1021        };
1022        let strategy = TestStrategy::new(strategy_config);
1023
1024        let exec_algorithm_config = DataActorConfig {
1025            actor_id: Some(ActorId::from("TestExecAlgorithm")),
1026            ..Default::default()
1027        };
1028        let exec_algorithm = TestDataActor::new(exec_algorithm_config);
1029
1030        assert!(trader.add_actor(actor).is_ok());
1031        assert!(trader.add_strategy(strategy).is_ok());
1032        assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1033        assert_eq!(trader.component_count(), 3);
1034
1035        // Test start components
1036        let start_result = trader.start_components();
1037        assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1038
1039        // Test stop components
1040        assert!(trader.stop_components().is_ok());
1041
1042        // Test reset components
1043        assert!(trader.reset_components().is_ok());
1044
1045        // Test dispose components
1046        assert!(trader.dispose_components().is_ok());
1047        assert_eq!(trader.component_count(), 0);
1048    }
1049
1050    #[rstest]
1051    fn test_trader_component_lifecycle() {
1052        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1053            create_trader_components();
1054        let trader_id = TraderId::default();
1055        let instance_id = UUID4::new();
1056
1057        let mut trader = Trader::new(
1058            trader_id,
1059            instance_id,
1060            Environment::Backtest,
1061            clock,
1062            cache,
1063            portfolio,
1064        );
1065
1066        // Initially pre-initialized
1067        assert_eq!(trader.state(), ComponentState::PreInitialized);
1068        assert!(!trader.is_running());
1069        assert!(!trader.is_stopped());
1070        assert!(!trader.is_disposed());
1071
1072        // Cannot start from pre-initialized state
1073        assert!(trader.start().is_err());
1074
1075        // Simulate initialization (normally done by kernel)
1076        trader.initialize().unwrap();
1077
1078        // Test start
1079        assert!(trader.start().is_ok());
1080        assert_eq!(trader.state(), ComponentState::Running);
1081        assert!(trader.is_running());
1082        assert!(trader.ts_started().is_some());
1083
1084        // Test stop
1085        assert!(trader.stop().is_ok());
1086        assert_eq!(trader.state(), ComponentState::Stopped);
1087        assert!(trader.is_stopped());
1088        assert!(trader.ts_stopped().is_some());
1089
1090        // Test reset
1091        assert!(trader.reset().is_ok());
1092        assert_eq!(trader.state(), ComponentState::Ready);
1093        assert!(trader.ts_started().is_none());
1094        assert!(trader.ts_stopped().is_none());
1095
1096        // Test dispose
1097        assert!(trader.dispose().is_ok());
1098        assert_eq!(trader.state(), ComponentState::Disposed);
1099        assert!(trader.is_disposed());
1100    }
1101
1102    #[rstest]
1103    fn test_cannot_add_components_while_running() {
1104        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1105            create_trader_components();
1106        let trader_id = TraderId::default();
1107        let instance_id = UUID4::new();
1108
1109        let mut trader = Trader::new(
1110            trader_id,
1111            instance_id,
1112            Environment::Backtest,
1113            clock,
1114            cache,
1115            portfolio,
1116        );
1117
1118        // Simulate running state
1119        trader.state = ComponentState::Running;
1120
1121        let actor = TestDataActor::new(DataActorConfig::default());
1122        let result = trader.add_actor(actor);
1123        assert!(result.is_err());
1124        assert!(
1125            result
1126                .unwrap_err()
1127                .to_string()
1128                .contains("while trader is running")
1129        );
1130    }
1131
1132    #[rstest]
1133    fn test_create_component_clock_backtest_vs_live() {
1134        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1135            create_trader_components();
1136        let trader_id = TraderId::default();
1137        let instance_id = UUID4::new();
1138
1139        // Test backtest environment - should create individual test clocks
1140        let trader_backtest = Trader::new(
1141            trader_id,
1142            instance_id,
1143            Environment::Backtest,
1144            clock.clone(),
1145            cache.clone(),
1146            portfolio.clone(),
1147        );
1148
1149        let backtest_clock = trader_backtest.create_component_clock();
1150        // In backtest, component clock should be different from system clock
1151        assert_ne!(
1152            backtest_clock.as_ptr() as *const _,
1153            clock.as_ptr() as *const _
1154        );
1155
1156        // Test live environment - should share system clock
1157        let trader_live = Trader::new(
1158            trader_id,
1159            instance_id,
1160            Environment::Live,
1161            clock.clone(),
1162            cache,
1163            portfolio,
1164        );
1165
1166        let live_clock = trader_live.create_component_clock();
1167        // In live, component clock should be same as system clock
1168        assert_eq!(live_clock.as_ptr() as *const _, clock.as_ptr() as *const _);
1169    }
1170}