Skip to main content

nautilus_system/
trader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Central orchestrator for managing actors, strategies, and execution algorithms.
17//!
18//! The `Trader` component serves as the primary coordination layer between the kernel
19//! and individual trading components. It manages component lifecycles, provides
20//! unique identification, and coordinates with system engines.
21
22use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
23
24use nautilus_common::{
25    actor::{DataActor, registry::try_get_actor_unchecked},
26    cache::Cache,
27    clock::{Clock, TestClock},
28    component::{
29        Component, dispose_component, register_component_actor, reset_component, start_component,
30        stop_component,
31    },
32    enums::{ComponentState, ComponentTrigger, Environment},
33    msgbus,
34    msgbus::{
35        handler::{ShareableMessageHandler, TypedMessageHandler},
36        switchboard::{get_event_orders_topic, get_event_positions_topic},
37    },
38    timer::{TimeEvent, TimeEventCallback},
39};
40use nautilus_core::{UUID4, UnixNanos};
41use nautilus_model::{
42    events::{OrderEventAny, PositionEvent},
43    identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
44};
45use nautilus_portfolio::portfolio::Portfolio;
46use nautilus_trading::strategy::Strategy;
47
48/// Central orchestrator for managing trading components.
49///
50/// The `Trader` manages the lifecycle and coordination of actors, strategies,
51/// and execution algorithms within the trading system. It provides component
52/// registration, state management, and integration with system engines.
53pub struct Trader {
54    /// The unique trader identifier.
55    pub trader_id: TraderId,
56    /// The unique instance identifier.
57    pub instance_id: UUID4,
58    /// The trading environment context.
59    pub environment: Environment,
60    /// Component state for lifecycle management.
61    state: ComponentState,
62    /// System clock for timestamping.
63    clock: Rc<RefCell<dyn Clock>>,
64    /// System cache for data storage.
65    cache: Rc<RefCell<Cache>>,
66    /// Portfolio reference for strategy registration.
67    portfolio: Rc<RefCell<Portfolio>>,
68    /// Registered actor IDs (actors stored in global registry).
69    actor_ids: Vec<ActorId>,
70    /// Registered strategy IDs (strategies stored in global registry).
71    strategy_ids: Vec<StrategyId>,
72    /// Registered exec algorithm IDs (algorithms stored in global registry).
73    exec_algorithm_ids: Vec<ExecAlgorithmId>,
74    /// Component clocks for individual components.
75    clocks: HashMap<ComponentId, Rc<RefCell<dyn Clock>>>, // TODO: TBD global clock?
76    /// Timestamp when the trader was created.
77    ts_created: UnixNanos,
78    /// Timestamp when the trader was last started.
79    ts_started: Option<UnixNanos>,
80    /// Timestamp when the trader was last stopped.
81    ts_stopped: Option<UnixNanos>,
82}
83
84impl Debug for Trader {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        write!(f, "{:?}", stringify!(TraderId)) // TODO
87    }
88}
89
90impl Trader {
91    /// Creates a new [`Trader`] instance.
92    #[must_use]
93    pub fn new(
94        trader_id: TraderId,
95        instance_id: UUID4,
96        environment: Environment,
97        clock: Rc<RefCell<dyn Clock>>,
98        cache: Rc<RefCell<Cache>>,
99        portfolio: Rc<RefCell<Portfolio>>,
100    ) -> Self {
101        let ts_created = clock.borrow().timestamp_ns();
102
103        Self {
104            trader_id,
105            instance_id,
106            environment,
107            state: ComponentState::PreInitialized,
108            clock,
109            cache,
110            portfolio,
111            actor_ids: Vec::new(),
112            strategy_ids: Vec::new(),
113            exec_algorithm_ids: Vec::new(),
114            clocks: HashMap::new(),
115            ts_created,
116            ts_started: None,
117            ts_stopped: None,
118        }
119    }
120
121    /// Returns the trader ID.
122    #[must_use]
123    pub const fn trader_id(&self) -> TraderId {
124        self.trader_id
125    }
126
127    /// Returns the instance ID.
128    #[must_use]
129    pub const fn instance_id(&self) -> UUID4 {
130        self.instance_id
131    }
132
133    /// Returns the trading environment.
134    #[must_use]
135    pub const fn environment(&self) -> Environment {
136        self.environment
137    }
138
139    /// Returns the current component state.
140    #[must_use]
141    pub const fn state(&self) -> ComponentState {
142        self.state
143    }
144
145    /// Returns the timestamp when the trader was created (UNIX nanoseconds).
146    #[must_use]
147    pub const fn ts_created(&self) -> UnixNanos {
148        self.ts_created
149    }
150
151    /// Returns the timestamp when the trader was last started (UNIX nanoseconds).
152    #[must_use]
153    pub const fn ts_started(&self) -> Option<UnixNanos> {
154        self.ts_started
155    }
156
157    /// Returns the timestamp when the trader was last stopped (UNIX nanoseconds).
158    #[must_use]
159    pub const fn ts_stopped(&self) -> Option<UnixNanos> {
160        self.ts_stopped
161    }
162
163    /// Returns the number of registered actors.
164    #[must_use]
165    pub const fn actor_count(&self) -> usize {
166        self.actor_ids.len()
167    }
168
169    /// Returns the number of registered strategies.
170    #[must_use]
171    pub const fn strategy_count(&self) -> usize {
172        self.strategy_ids.len()
173    }
174
175    /// Returns the number of registered execution algorithms.
176    #[must_use]
177    pub const fn exec_algorithm_count(&self) -> usize {
178        self.exec_algorithm_ids.len()
179    }
180
181    /// Returns the total number of registered components.
182    #[must_use]
183    pub const fn component_count(&self) -> usize {
184        self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
185    }
186
187    /// Returns a list of all registered actor IDs.
188    #[must_use]
189    pub fn actor_ids(&self) -> Vec<ActorId> {
190        self.actor_ids.clone()
191    }
192
193    /// Returns a list of all registered strategy IDs.
194    #[must_use]
195    pub fn strategy_ids(&self) -> Vec<StrategyId> {
196        self.strategy_ids.clone()
197    }
198
199    /// Returns a list of all registered execution algorithm IDs.
200    #[must_use]
201    pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
202        self.exec_algorithm_ids.clone()
203    }
204
205    /// Creates a clock for a component.
206    ///
207    /// Creates a test clock in backtest environment, otherwise returns a reference
208    /// to the system clock.
209    fn create_component_clock(&self) -> Rc<RefCell<dyn Clock>> {
210        match self.environment {
211            Environment::Backtest => {
212                // Create individual test clock for component in backtest
213                Rc::new(RefCell::new(TestClock::new()))
214            }
215            Environment::Live | Environment::Sandbox => {
216                // Share system clock in live environments
217                self.clock.clone()
218            }
219        }
220    }
221
222    /// Adds an actor to the trader.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if:
227    /// - The trader is not in a valid state for adding components.
228    /// - An actor with the same ID is already registered.
229    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
230    where
231        T: DataActor + Component + Debug + 'static,
232    {
233        self.validate_component_registration()?;
234
235        let actor_id = actor.actor_id();
236
237        // Check for duplicate registration
238        if self.actor_ids.contains(&actor_id) {
239            anyhow::bail!("Actor '{actor_id}' is already registered");
240        }
241
242        let clock = self.create_component_clock();
243        let component_id = ComponentId::new(actor_id.inner().as_str());
244        self.clocks.insert(component_id, clock.clone());
245
246        let mut actor_mut = actor;
247        actor_mut.register(self.trader_id, clock, self.cache.clone())?;
248
249        self.add_registered_actor(actor_mut)
250    }
251
252    /// Adds an actor to the trader using a factory function.
253    ///
254    /// The factory function is called at registration time to create the actor,
255    /// avoiding cloning issues with non-cloneable actor types.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if:
260    /// - The factory function fails to create the actor.
261    /// - The trader is not in a valid state for adding components.
262    /// - An actor with the same ID is already registered.
263    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
264    where
265        F: FnOnce() -> anyhow::Result<T>,
266        T: DataActor + Component + Debug + 'static,
267    {
268        let actor = factory()?;
269
270        self.add_actor(actor)
271    }
272
273    /// Adds an already registered actor to the trader's component registry.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the actor cannot be registered in the component registry.
278    pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
279    where
280        T: DataActor + Component + Debug + 'static,
281    {
282        let actor_id = actor.actor_id();
283
284        // Register in both component and actor registries (this consumes the actor)
285        register_component_actor(actor);
286
287        // Store actor ID for lifecycle management
288        self.actor_ids.push(actor_id);
289
290        log::info!(
291            "Registered actor '{actor_id}' with trader {}",
292            self.trader_id
293        );
294
295        Ok(())
296    }
297
298    /// Adds an actor ID to the trader's lifecycle management without consuming the actor.
299    ///
300    /// This is useful when the actor is already registered in the global component registry
301    /// but the trader needs to track it for lifecycle management. The caller is responsible
302    /// for ensuring the actor is properly registered in the global registries.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the actor ID is already tracked by this trader.
307    pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
308        // Check for duplicate registration
309        if self.actor_ids.contains(&actor_id) {
310            anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
311        }
312
313        // Store actor ID for lifecycle management
314        self.actor_ids.push(actor_id);
315
316        log::debug!(
317            "Added actor ID '{actor_id}' to trader {} for lifecycle management",
318            self.trader_id
319        );
320
321        Ok(())
322    }
323
324    /// Adds a strategy to the trader.
325    ///
326    /// Strategies are registered in both the component registry (for lifecycle management)
327    /// and the actor registry (for data callbacks via msgbus). The strategy's `StrategyCore`
328    /// is also registered with the portfolio for order management.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if:
333    /// - The trader is not in a valid state for adding components.
334    /// - A strategy with the same ID is already registered.
335    pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
336    where
337        T: Strategy + Component + Debug + 'static,
338    {
339        self.validate_component_registration()?;
340
341        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
342
343        // Check for duplicate registration
344        if self.strategy_ids.contains(&strategy_id) {
345            anyhow::bail!("Strategy '{strategy_id}' is already registered");
346        }
347
348        let clock = self.create_component_clock();
349        let component_id = strategy.component_id();
350        self.clocks.insert(component_id, clock.clone());
351
352        // Register strategy core with portfolio for order management
353        strategy.core_mut().register(
354            self.trader_id,
355            clock.clone(),
356            self.cache.clone(),
357            self.portfolio.clone(),
358        )?;
359
360        // Register default time event handler for this strategy
361        let actor_id = strategy.actor_id().inner();
362        let callback = TimeEventCallback::from(move |event: TimeEvent| {
363            if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
364                actor.handle_time_event(&event);
365            } else {
366                log::error!("Strategy {actor_id} not found for time event handling");
367            }
368        });
369        clock.borrow_mut().register_default_handler(callback);
370
371        // Transition to Ready state
372        strategy.initialize()?;
373
374        // Register in both component and actor registries
375        register_component_actor(strategy);
376
377        let order_topic = get_event_orders_topic(strategy_id);
378        let order_actor_id = actor_id;
379        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
380            move |event: &OrderEventAny| {
381                if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
382                    strategy.handle_order_event(event.clone());
383                } else {
384                    log::error!("Strategy {order_actor_id} not found for order event handling");
385                }
386            },
387        )));
388        msgbus::subscribe_topic(order_topic, handler, None);
389
390        let position_topic = get_event_positions_topic(strategy_id);
391        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
392            move |event: &PositionEvent| {
393                if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
394                    strategy.handle_position_event(event.clone());
395                } else {
396                    log::error!("Strategy {actor_id} not found for position event handling");
397                }
398            },
399        )));
400        msgbus::subscribe_topic(position_topic, handler, None);
401
402        self.strategy_ids.push(strategy_id);
403
404        log::info!(
405            "Registered strategy {strategy_id} with trader {}",
406            self.trader_id
407        );
408
409        Ok(())
410    }
411
412    /// Adds an execution algorithm to the trader.
413    ///
414    /// Execution algorithms are registered in both the component registry (for lifecycle
415    /// management) and the actor registry (for data callbacks via msgbus).
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if:
420    /// - The trader is not in a valid state for adding components.
421    /// - An execution algorithm with the same ID is already registered.
422    pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
423    where
424        T: DataActor + Component + Debug + 'static,
425    {
426        self.validate_component_registration()?;
427
428        let exec_algorithm_id =
429            ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
430
431        // Check for duplicate registration
432        if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
433            anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
434        }
435
436        let clock = self.create_component_clock();
437        let component_id = exec_algorithm.component_id();
438        self.clocks.insert(component_id, clock.clone());
439
440        exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
441
442        // Register in both component and actor registries
443        register_component_actor(exec_algorithm);
444
445        self.exec_algorithm_ids.push(exec_algorithm_id);
446
447        log::info!(
448            "Registered execution algorithm '{exec_algorithm_id}' with trader {}",
449            self.trader_id
450        );
451
452        Ok(())
453    }
454
455    /// Validates that the trader is in a valid state for component registration.
456    fn validate_component_registration(&self) -> anyhow::Result<()> {
457        match self.state {
458            ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
459                Ok(())
460            }
461            ComponentState::Running => {
462                anyhow::bail!("Cannot add components while trader is running")
463            }
464            ComponentState::Disposed => {
465                anyhow::bail!("Cannot add components to disposed trader")
466            }
467            _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
468        }
469    }
470
471    /// Starts all registered components.
472    ///
473    /// # Errors
474    ///
475    /// Returns an error if any component fails to start.
476    pub fn start_components(&mut self) -> anyhow::Result<()> {
477        for actor_id in &self.actor_ids {
478            log::debug!("Starting actor {actor_id}");
479            start_component(&actor_id.inner())?;
480        }
481
482        for strategy_id in &self.strategy_ids {
483            log::debug!("Starting strategy {strategy_id}");
484            start_component(&strategy_id.inner())?;
485        }
486
487        for exec_algorithm_id in &self.exec_algorithm_ids {
488            log::debug!("Starting execution algorithm {exec_algorithm_id}");
489            start_component(&exec_algorithm_id.inner())?;
490        }
491
492        Ok(())
493    }
494
495    /// Stops all registered components.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if any component fails to stop.
500    pub fn stop_components(&mut self) -> anyhow::Result<()> {
501        for actor_id in &self.actor_ids {
502            log::debug!("Stopping actor {actor_id}");
503            stop_component(&actor_id.inner())?;
504        }
505
506        for exec_algorithm_id in &self.exec_algorithm_ids {
507            log::debug!("Stopping execution algorithm {exec_algorithm_id}");
508            stop_component(&exec_algorithm_id.inner())?;
509        }
510
511        for strategy_id in &self.strategy_ids {
512            log::debug!("Stopping strategy {strategy_id}");
513            stop_component(&strategy_id.inner())?;
514        }
515
516        Ok(())
517    }
518
519    /// Resets all registered components.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if any component fails to reset.
524    pub fn reset_components(&mut self) -> anyhow::Result<()> {
525        for actor_id in &self.actor_ids {
526            log::debug!("Resetting actor {actor_id}");
527            reset_component(&actor_id.inner())?;
528        }
529
530        for strategy_id in &self.strategy_ids {
531            log::debug!("Resetting strategy {strategy_id}");
532            reset_component(&strategy_id.inner())?;
533        }
534
535        for exec_algorithm_id in &self.exec_algorithm_ids {
536            log::debug!("Resetting execution algorithm {exec_algorithm_id}");
537            reset_component(&exec_algorithm_id.inner())?;
538        }
539
540        Ok(())
541    }
542
543    /// Disposes of all registered components.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if any component fails to dispose.
548    pub fn dispose_components(&mut self) -> anyhow::Result<()> {
549        for actor_id in &self.actor_ids {
550            log::debug!("Disposing actor {actor_id}");
551            dispose_component(&actor_id.inner())?;
552        }
553
554        for strategy_id in &self.strategy_ids {
555            log::debug!("Disposing strategy {strategy_id}");
556            dispose_component(&strategy_id.inner())?;
557        }
558
559        for exec_algorithm_id in &self.exec_algorithm_ids {
560            log::debug!("Disposing execution algorithm {exec_algorithm_id}");
561            dispose_component(&exec_algorithm_id.inner())?;
562        }
563
564        self.actor_ids.clear();
565        self.strategy_ids.clear();
566        self.exec_algorithm_ids.clear();
567        self.clocks.clear();
568
569        Ok(())
570    }
571
572    /// Initializes the trader, transitioning from `PreInitialized` to `Ready` state.
573    ///
574    /// This method must be called before starting the trader.
575    ///
576    /// # Errors
577    ///
578    /// Returns an error if the trader cannot be initialized from its current state.
579    pub fn initialize(&mut self) -> anyhow::Result<()> {
580        let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
581        self.state = new_state;
582
583        Ok(())
584    }
585
586    fn on_start(&mut self) -> anyhow::Result<()> {
587        self.start_components()?;
588
589        // Transition to running state
590        self.ts_started = Some(self.clock.borrow().timestamp_ns());
591
592        Ok(())
593    }
594
595    fn on_stop(&mut self) -> anyhow::Result<()> {
596        self.stop_components()?;
597
598        self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
599
600        Ok(())
601    }
602
603    fn on_reset(&mut self) -> anyhow::Result<()> {
604        self.reset_components()?;
605
606        self.ts_started = None;
607        self.ts_stopped = None;
608
609        Ok(())
610    }
611
612    fn on_dispose(&mut self) -> anyhow::Result<()> {
613        if self.is_running() {
614            self.stop()?;
615        }
616
617        self.dispose_components()?;
618
619        Ok(())
620    }
621}
622
623impl Component for Trader {
624    fn component_id(&self) -> ComponentId {
625        ComponentId::new(format!("Trader-{}", self.trader_id))
626    }
627
628    fn state(&self) -> ComponentState {
629        self.state
630    }
631
632    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
633        self.state = self.state.transition(&trigger)?;
634        log::info!("{}", self.state.variant_name());
635        Ok(())
636    }
637
638    fn register(
639        &mut self,
640        _trader_id: TraderId,
641        _clock: Rc<RefCell<dyn Clock>>,
642        _cache: Rc<RefCell<Cache>>,
643    ) -> anyhow::Result<()> {
644        anyhow::bail!("Trader cannot register with itself")
645    }
646
647    fn on_start(&mut self) -> anyhow::Result<()> {
648        Self::on_start(self)
649    }
650
651    fn on_stop(&mut self) -> anyhow::Result<()> {
652        Self::on_stop(self)
653    }
654
655    fn on_reset(&mut self) -> anyhow::Result<()> {
656        Self::on_reset(self)
657    }
658
659    fn on_dispose(&mut self) -> anyhow::Result<()> {
660        Self::on_dispose(self)
661    }
662}
663
664#[cfg(test)]
665mod tests {
666    use std::{
667        cell::RefCell,
668        ops::{Deref, DerefMut},
669        rc::Rc,
670    };
671
672    use nautilus_common::{
673        actor::{DataActorCore, data_actor::DataActorConfig},
674        cache::Cache,
675        clock::TestClock,
676        enums::{ComponentState, Environment},
677        msgbus::MessageBus,
678    };
679    use nautilus_core::UUID4;
680    use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
681    use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
682    use nautilus_model::{
683        identifiers::{ActorId, ComponentId, TraderId},
684        stubs::TestDefault,
685    };
686    use nautilus_portfolio::portfolio::Portfolio;
687    use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
688    use nautilus_trading::strategy::{
689        Strategy as StrategyTrait, config::StrategyConfig, core::StrategyCore,
690    };
691    use rstest::rstest;
692
693    use super::*;
694
695    // Simple DataActor wrapper for testing
696    #[derive(Debug)]
697    struct TestDataActor {
698        core: DataActorCore,
699    }
700
701    impl TestDataActor {
702        fn new(config: DataActorConfig) -> Self {
703            Self {
704                core: DataActorCore::new(config),
705            }
706        }
707    }
708
709    impl DataActor for TestDataActor {}
710
711    impl Deref for TestDataActor {
712        type Target = DataActorCore;
713        fn deref(&self) -> &Self::Target {
714            &self.core
715        }
716    }
717
718    impl DerefMut for TestDataActor {
719        fn deref_mut(&mut self) -> &mut Self::Target {
720            &mut self.core
721        }
722    }
723
724    // Simple Strategy wrapper for testing
725    #[derive(Debug)]
726    struct TestStrategy {
727        core: StrategyCore,
728    }
729
730    impl TestStrategy {
731        fn new(config: StrategyConfig) -> Self {
732            Self {
733                core: StrategyCore::new(config),
734            }
735        }
736    }
737
738    impl DataActor for TestStrategy {}
739
740    // Deref through StrategyCore to DataActorCore
741    impl Deref for TestStrategy {
742        type Target = DataActorCore;
743        fn deref(&self) -> &Self::Target {
744            &self.core
745        }
746    }
747
748    impl DerefMut for TestStrategy {
749        fn deref_mut(&mut self) -> &mut Self::Target {
750            &mut self.core
751        }
752    }
753
754    impl StrategyTrait for TestStrategy {
755        fn core_mut(&mut self) -> &mut StrategyCore {
756            &mut self.core
757        }
758    }
759
760    #[allow(clippy::type_complexity)]
761    fn create_trader_components() -> (
762        Rc<RefCell<MessageBus>>,
763        Rc<RefCell<Cache>>,
764        Rc<RefCell<Portfolio>>,
765        Rc<RefCell<DataEngine>>,
766        Rc<RefCell<RiskEngine>>,
767        Rc<RefCell<ExecutionEngine>>,
768        Rc<RefCell<TestClock>>,
769    ) {
770        let trader_id = TraderId::test_default();
771        let instance_id = UUID4::new();
772        let clock = Rc::new(RefCell::new(TestClock::new()));
773        // Set the clock to a non-zero time for test purposes
774        clock.borrow_mut().set_time(1_000_000_000u64.into());
775        let msgbus = Rc::new(RefCell::new(MessageBus::new(
776            trader_id,
777            instance_id,
778            Some("test".to_string()),
779            None,
780        )));
781        let cache = Rc::new(RefCell::new(Cache::new(None, None)));
782        let portfolio = Rc::new(RefCell::new(Portfolio::new(
783            cache.clone(),
784            clock.clone() as Rc<RefCell<dyn Clock>>,
785            None,
786        )));
787        let data_engine = Rc::new(RefCell::new(DataEngine::new(
788            clock.clone(),
789            cache.clone(),
790            Some(DataEngineConfig::default()),
791        )));
792
793        // Create separate cache and clock instances for RiskEngine to avoid borrowing conflicts
794        let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
795        let risk_clock = Rc::new(RefCell::new(TestClock::new()));
796        let risk_portfolio = Portfolio::new(
797            risk_cache.clone(),
798            risk_clock.clone() as Rc<RefCell<dyn Clock>>,
799            None,
800        );
801        let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
802            RiskEngineConfig::default(),
803            risk_portfolio,
804            risk_clock as Rc<RefCell<dyn Clock>>,
805            risk_cache,
806        )));
807        let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
808            clock.clone(),
809            cache.clone(),
810            Some(ExecutionEngineConfig::default()),
811        )));
812
813        (
814            msgbus,
815            cache,
816            portfolio,
817            data_engine,
818            risk_engine,
819            exec_engine,
820            clock,
821        )
822    }
823
824    #[rstest]
825    fn test_trader_creation() {
826        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
827            create_trader_components();
828        let trader_id = TraderId::test_default();
829        let instance_id = UUID4::new();
830
831        let trader = Trader::new(
832            trader_id,
833            instance_id,
834            Environment::Backtest,
835            clock,
836            cache,
837            portfolio,
838        );
839
840        assert_eq!(trader.trader_id(), trader_id);
841        assert_eq!(trader.instance_id(), instance_id);
842        assert_eq!(trader.environment(), Environment::Backtest);
843        assert_eq!(trader.state(), ComponentState::PreInitialized);
844        assert_eq!(trader.actor_count(), 0);
845        assert_eq!(trader.strategy_count(), 0);
846        assert_eq!(trader.exec_algorithm_count(), 0);
847        assert_eq!(trader.component_count(), 0);
848        assert!(!trader.is_running());
849        assert!(!trader.is_stopped());
850        assert!(!trader.is_disposed());
851        assert!(trader.ts_created() > 0);
852        assert!(trader.ts_started().is_none());
853        assert!(trader.ts_stopped().is_none());
854    }
855
856    #[rstest]
857    fn test_trader_component_id() {
858        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
859            create_trader_components();
860        let trader_id = TraderId::from("TRADER-001");
861        let instance_id = UUID4::new();
862
863        let trader = Trader::new(
864            trader_id,
865            instance_id,
866            Environment::Backtest,
867            clock,
868            cache,
869            portfolio,
870        );
871
872        assert_eq!(
873            trader.component_id(),
874            ComponentId::from("Trader-TRADER-001")
875        );
876    }
877
878    #[rstest]
879    fn test_add_actor_success() {
880        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
881            create_trader_components();
882        let trader_id = TraderId::test_default();
883        let instance_id = UUID4::new();
884
885        let mut trader = Trader::new(
886            trader_id,
887            instance_id,
888            Environment::Backtest,
889            clock,
890            cache,
891            portfolio,
892        );
893
894        let actor = TestDataActor::new(DataActorConfig::default());
895        let actor_id = actor.actor_id();
896
897        let result = trader.add_actor(actor);
898        assert!(result.is_ok());
899        assert_eq!(trader.actor_count(), 1);
900        assert_eq!(trader.component_count(), 1);
901        assert!(trader.actor_ids().contains(&actor_id));
902    }
903
904    #[rstest]
905    fn test_add_duplicate_actor_fails() {
906        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
907            create_trader_components();
908        let trader_id = TraderId::test_default();
909        let instance_id = UUID4::new();
910
911        let mut trader = Trader::new(
912            trader_id,
913            instance_id,
914            Environment::Backtest,
915            clock,
916            cache,
917            portfolio,
918        );
919
920        let config = DataActorConfig {
921            actor_id: Some(ActorId::from("TestActor")),
922            ..Default::default()
923        };
924        let actor1 = TestDataActor::new(config.clone());
925        let actor2 = TestDataActor::new(config);
926
927        // First addition should succeed
928        assert!(trader.add_actor(actor1).is_ok());
929        assert_eq!(trader.actor_count(), 1);
930
931        // Second addition should fail
932        let result = trader.add_actor(actor2);
933        assert!(result.is_err());
934        assert!(
935            result
936                .unwrap_err()
937                .to_string()
938                .contains("already registered")
939        );
940        assert_eq!(trader.actor_count(), 1);
941    }
942
943    #[rstest]
944    fn test_add_strategy_success() {
945        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
946            create_trader_components();
947        let trader_id = TraderId::test_default();
948        let instance_id = UUID4::new();
949
950        let mut trader = Trader::new(
951            trader_id,
952            instance_id,
953            Environment::Backtest,
954            clock,
955            cache,
956            portfolio,
957        );
958
959        let config = StrategyConfig {
960            strategy_id: Some(StrategyId::from("Test-Strategy")),
961            ..Default::default()
962        };
963        let strategy = TestStrategy::new(config);
964        let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
965
966        let result = trader.add_strategy(strategy);
967        assert!(result.is_ok());
968        assert_eq!(trader.strategy_count(), 1);
969        assert_eq!(trader.component_count(), 1);
970        assert!(trader.strategy_ids().contains(&strategy_id));
971    }
972
973    #[rstest]
974    fn test_add_exec_algorithm_success() {
975        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
976            create_trader_components();
977        let trader_id = TraderId::test_default();
978        let instance_id = UUID4::new();
979
980        let mut trader = Trader::new(
981            trader_id,
982            instance_id,
983            Environment::Backtest,
984            clock,
985            cache,
986            portfolio,
987        );
988
989        let config = DataActorConfig {
990            actor_id: Some(ActorId::from("TestExecAlgorithm")),
991            ..Default::default()
992        };
993        let exec_algorithm = TestDataActor::new(config);
994        let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
995
996        let result = trader.add_exec_algorithm(exec_algorithm);
997        assert!(result.is_ok());
998        assert_eq!(trader.exec_algorithm_count(), 1);
999        assert_eq!(trader.component_count(), 1);
1000        assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
1001    }
1002
1003    #[rstest]
1004    fn test_component_lifecycle() {
1005        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1006            create_trader_components();
1007        let trader_id = TraderId::test_default();
1008        let instance_id = UUID4::new();
1009
1010        let mut trader = Trader::new(
1011            trader_id,
1012            instance_id,
1013            Environment::Backtest,
1014            clock,
1015            cache,
1016            portfolio,
1017        );
1018
1019        // Add components
1020        let actor = TestDataActor::new(DataActorConfig::default());
1021
1022        let strategy_config = StrategyConfig {
1023            strategy_id: Some(StrategyId::from("Test-Strategy")),
1024            ..Default::default()
1025        };
1026        let strategy = TestStrategy::new(strategy_config);
1027
1028        let exec_algorithm_config = DataActorConfig {
1029            actor_id: Some(ActorId::from("TestExecAlgorithm")),
1030            ..Default::default()
1031        };
1032        let exec_algorithm = TestDataActor::new(exec_algorithm_config);
1033
1034        assert!(trader.add_actor(actor).is_ok());
1035        assert!(trader.add_strategy(strategy).is_ok());
1036        assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1037        assert_eq!(trader.component_count(), 3);
1038
1039        // Test start components
1040        let start_result = trader.start_components();
1041        assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1042
1043        // Test stop components
1044        assert!(trader.stop_components().is_ok());
1045
1046        // Test reset components
1047        assert!(trader.reset_components().is_ok());
1048
1049        // Test dispose components
1050        assert!(trader.dispose_components().is_ok());
1051        assert_eq!(trader.component_count(), 0);
1052    }
1053
1054    #[rstest]
1055    fn test_trader_component_lifecycle() {
1056        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1057            create_trader_components();
1058        let trader_id = TraderId::test_default();
1059        let instance_id = UUID4::new();
1060
1061        let mut trader = Trader::new(
1062            trader_id,
1063            instance_id,
1064            Environment::Backtest,
1065            clock,
1066            cache,
1067            portfolio,
1068        );
1069
1070        // Initially pre-initialized
1071        assert_eq!(trader.state(), ComponentState::PreInitialized);
1072        assert!(!trader.is_running());
1073        assert!(!trader.is_stopped());
1074        assert!(!trader.is_disposed());
1075
1076        // Cannot start from pre-initialized state
1077        assert!(trader.start().is_err());
1078
1079        // Simulate initialization (normally done by kernel)
1080        trader.initialize().unwrap();
1081
1082        // Test start
1083        assert!(trader.start().is_ok());
1084        assert_eq!(trader.state(), ComponentState::Running);
1085        assert!(trader.is_running());
1086        assert!(trader.ts_started().is_some());
1087
1088        // Test stop
1089        assert!(trader.stop().is_ok());
1090        assert_eq!(trader.state(), ComponentState::Stopped);
1091        assert!(trader.is_stopped());
1092        assert!(trader.ts_stopped().is_some());
1093
1094        // Test reset
1095        assert!(trader.reset().is_ok());
1096        assert_eq!(trader.state(), ComponentState::Ready);
1097        assert!(trader.ts_started().is_none());
1098        assert!(trader.ts_stopped().is_none());
1099
1100        // Test dispose
1101        assert!(trader.dispose().is_ok());
1102        assert_eq!(trader.state(), ComponentState::Disposed);
1103        assert!(trader.is_disposed());
1104    }
1105
1106    #[rstest]
1107    fn test_cannot_add_components_while_running() {
1108        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1109            create_trader_components();
1110        let trader_id = TraderId::test_default();
1111        let instance_id = UUID4::new();
1112
1113        let mut trader = Trader::new(
1114            trader_id,
1115            instance_id,
1116            Environment::Backtest,
1117            clock,
1118            cache,
1119            portfolio,
1120        );
1121
1122        // Simulate running state
1123        trader.state = ComponentState::Running;
1124
1125        let actor = TestDataActor::new(DataActorConfig::default());
1126        let result = trader.add_actor(actor);
1127        assert!(result.is_err());
1128        assert!(
1129            result
1130                .unwrap_err()
1131                .to_string()
1132                .contains("while trader is running")
1133        );
1134    }
1135
1136    #[rstest]
1137    fn test_create_component_clock_backtest_vs_live() {
1138        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1139            create_trader_components();
1140        let trader_id = TraderId::test_default();
1141        let instance_id = UUID4::new();
1142
1143        // Test backtest environment - should create individual test clocks
1144        let trader_backtest = Trader::new(
1145            trader_id,
1146            instance_id,
1147            Environment::Backtest,
1148            clock.clone(),
1149            cache.clone(),
1150            portfolio.clone(),
1151        );
1152
1153        let backtest_clock = trader_backtest.create_component_clock();
1154        // In backtest, component clock should be different from system clock
1155        assert_ne!(
1156            backtest_clock.as_ptr() as *const _,
1157            clock.as_ptr() as *const _
1158        );
1159
1160        // Test live environment - should share system clock
1161        let trader_live = Trader::new(
1162            trader_id,
1163            instance_id,
1164            Environment::Live,
1165            clock.clone(),
1166            cache,
1167            portfolio,
1168        );
1169
1170        let live_clock = trader_live.create_component_clock();
1171        // In live, component clock should be same as system clock
1172        assert_eq!(live_clock.as_ptr() as *const _, clock.as_ptr() as *const _);
1173    }
1174}