nautilus_system/
trader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Central orchestrator for managing actors, strategies, and execution algorithms.
17//!
18//! The `Trader` component serves as the primary coordination layer between the kernel
19//! and individual trading components. It manages component lifecycles, provides
20//! unique identification, and coordinates with system engines.
21
22use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
23
24use nautilus_common::{
25    actor::{DataActor, registry::try_get_actor_unchecked},
26    cache::Cache,
27    clock::{Clock, TestClock},
28    component::{
29        Component, dispose_component, register_component_actor, reset_component, start_component,
30        stop_component,
31    },
32    enums::{ComponentState, ComponentTrigger, Environment},
33    msgbus,
34    msgbus::{
35        handler::{ShareableMessageHandler, TypedMessageHandler},
36        switchboard::{get_event_orders_topic, get_event_positions_topic},
37    },
38    timer::{TimeEvent, TimeEventCallback},
39};
40use nautilus_core::{UUID4, UnixNanos};
41use nautilus_model::{
42    events::{OrderEventAny, PositionEvent},
43    identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
44};
45use nautilus_portfolio::portfolio::Portfolio;
46use nautilus_trading::strategy::Strategy;
47
48/// Central orchestrator for managing trading components.
49///
50/// The `Trader` manages the lifecycle and coordination of actors, strategies,
51/// and execution algorithms within the trading system. It provides component
52/// registration, state management, and integration with system engines.
53pub struct Trader {
54    /// The unique trader identifier.
55    pub trader_id: TraderId,
56    /// The unique instance identifier.
57    pub instance_id: UUID4,
58    /// The trading environment context.
59    pub environment: Environment,
60    /// Component state for lifecycle management.
61    state: ComponentState,
62    /// System clock for timestamping.
63    clock: Rc<RefCell<dyn Clock>>,
64    /// System cache for data storage.
65    cache: Rc<RefCell<Cache>>,
66    /// Portfolio reference for strategy registration.
67    portfolio: Rc<RefCell<Portfolio>>,
68    /// Registered actor IDs (actors stored in global registry).
69    actor_ids: Vec<ActorId>,
70    /// Registered strategy IDs (strategies stored in global registry).
71    strategy_ids: Vec<StrategyId>,
72    /// Registered exec algorithm IDs (algorithms stored in global registry).
73    exec_algorithm_ids: Vec<ExecAlgorithmId>,
74    /// Component clocks for individual components.
75    clocks: HashMap<ComponentId, Rc<RefCell<dyn Clock>>>, // TODO: TBD global clock?
76    /// Timestamp when the trader was created.
77    ts_created: UnixNanos,
78    /// Timestamp when the trader was last started.
79    ts_started: Option<UnixNanos>,
80    /// Timestamp when the trader was last stopped.
81    ts_stopped: Option<UnixNanos>,
82}
83
84impl Debug for Trader {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        write!(f, "{:?}", stringify!(TraderId)) // TODO
87    }
88}
89
90impl Trader {
91    /// Creates a new [`Trader`] instance.
92    #[must_use]
93    pub fn new(
94        trader_id: TraderId,
95        instance_id: UUID4,
96        environment: Environment,
97        clock: Rc<RefCell<dyn Clock>>,
98        cache: Rc<RefCell<Cache>>,
99        portfolio: Rc<RefCell<Portfolio>>,
100    ) -> Self {
101        let ts_created = clock.borrow().timestamp_ns();
102
103        Self {
104            trader_id,
105            instance_id,
106            environment,
107            state: ComponentState::PreInitialized,
108            clock,
109            cache,
110            portfolio,
111            actor_ids: Vec::new(),
112            strategy_ids: Vec::new(),
113            exec_algorithm_ids: Vec::new(),
114            clocks: HashMap::new(),
115            ts_created,
116            ts_started: None,
117            ts_stopped: None,
118        }
119    }
120
121    /// Returns the trader ID.
122    #[must_use]
123    pub const fn trader_id(&self) -> TraderId {
124        self.trader_id
125    }
126
127    /// Returns the instance ID.
128    #[must_use]
129    pub const fn instance_id(&self) -> UUID4 {
130        self.instance_id
131    }
132
133    /// Returns the trading environment.
134    #[must_use]
135    pub const fn environment(&self) -> Environment {
136        self.environment
137    }
138
139    /// Returns the current component state.
140    #[must_use]
141    pub const fn state(&self) -> ComponentState {
142        self.state
143    }
144
145    /// Returns the timestamp when the trader was created (UNIX nanoseconds).
146    #[must_use]
147    pub const fn ts_created(&self) -> UnixNanos {
148        self.ts_created
149    }
150
151    /// Returns the timestamp when the trader was last started (UNIX nanoseconds).
152    #[must_use]
153    pub const fn ts_started(&self) -> Option<UnixNanos> {
154        self.ts_started
155    }
156
157    /// Returns the timestamp when the trader was last stopped (UNIX nanoseconds).
158    #[must_use]
159    pub const fn ts_stopped(&self) -> Option<UnixNanos> {
160        self.ts_stopped
161    }
162
163    /// Returns the number of registered actors.
164    #[must_use]
165    pub const fn actor_count(&self) -> usize {
166        self.actor_ids.len()
167    }
168
169    /// Returns the number of registered strategies.
170    #[must_use]
171    pub const fn strategy_count(&self) -> usize {
172        self.strategy_ids.len()
173    }
174
175    /// Returns the number of registered execution algorithms.
176    #[must_use]
177    pub const fn exec_algorithm_count(&self) -> usize {
178        self.exec_algorithm_ids.len()
179    }
180
181    /// Returns the total number of registered components.
182    #[must_use]
183    pub const fn component_count(&self) -> usize {
184        self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
185    }
186
187    /// Returns a list of all registered actor IDs.
188    #[must_use]
189    pub fn actor_ids(&self) -> Vec<ActorId> {
190        self.actor_ids.clone()
191    }
192
193    /// Returns a list of all registered strategy IDs.
194    #[must_use]
195    pub fn strategy_ids(&self) -> Vec<StrategyId> {
196        self.strategy_ids.clone()
197    }
198
199    /// Returns a list of all registered execution algorithm IDs.
200    #[must_use]
201    pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
202        self.exec_algorithm_ids.clone()
203    }
204
205    /// Creates a clock for a component.
206    ///
207    /// Creates a test clock in backtest environment, otherwise returns a reference
208    /// to the system clock.
209    fn create_component_clock(&self) -> Rc<RefCell<dyn Clock>> {
210        match self.environment {
211            Environment::Backtest => {
212                // Create individual test clock for component in backtest
213                Rc::new(RefCell::new(TestClock::new()))
214            }
215            Environment::Live | Environment::Sandbox => {
216                // Share system clock in live environments
217                self.clock.clone()
218            }
219        }
220    }
221
222    /// Adds an actor to the trader.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if:
227    /// - The trader is not in a valid state for adding components.
228    /// - An actor with the same ID is already registered.
229    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
230    where
231        T: DataActor + Component + Debug + 'static,
232    {
233        self.validate_component_registration()?;
234
235        let actor_id = actor.actor_id();
236
237        // Check for duplicate registration
238        if self.actor_ids.contains(&actor_id) {
239            anyhow::bail!("Actor {actor_id} is already registered");
240        }
241
242        let clock = self.create_component_clock();
243        let component_id = ComponentId::new(actor_id.inner().as_str());
244        self.clocks.insert(component_id, clock.clone());
245
246        let mut actor_mut = actor;
247        actor_mut.register(self.trader_id, clock, self.cache.clone())?;
248
249        self.add_registered_actor(actor_mut)
250    }
251
252    /// Adds an actor to the trader using a factory function.
253    ///
254    /// The factory function is called at registration time to create the actor,
255    /// avoiding cloning issues with non-cloneable actor types.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if:
260    /// - The factory function fails to create the actor.
261    /// - The trader is not in a valid state for adding components.
262    /// - An actor with the same ID is already registered.
263    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
264    where
265        F: FnOnce() -> anyhow::Result<T>,
266        T: DataActor + Component + Debug + 'static,
267    {
268        let actor = factory()?;
269
270        self.add_actor(actor)
271    }
272
273    /// Adds an already registered actor to the trader's component registry.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the actor cannot be registered in the component registry.
278    pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
279    where
280        T: DataActor + Component + Debug + 'static,
281    {
282        let actor_id = actor.actor_id();
283
284        // Register in both component and actor registries (this consumes the actor)
285        register_component_actor(actor);
286
287        // Store actor ID for lifecycle management
288        self.actor_ids.push(actor_id);
289
290        log::info!("Registered actor {actor_id} with trader {}", self.trader_id);
291
292        Ok(())
293    }
294
295    /// Adds an actor ID to the trader's lifecycle management without consuming the actor.
296    ///
297    /// This is useful when the actor is already registered in the global component registry
298    /// but the trader needs to track it for lifecycle management. The caller is responsible
299    /// for ensuring the actor is properly registered in the global registries.
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if the actor ID is already tracked by this trader.
304    pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
305        // Check for duplicate registration
306        if self.actor_ids.contains(&actor_id) {
307            anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
308        }
309
310        // Store actor ID for lifecycle management
311        self.actor_ids.push(actor_id);
312
313        log::debug!(
314            "Added actor ID '{actor_id}' to trader {} for lifecycle management",
315            self.trader_id
316        );
317
318        Ok(())
319    }
320
321    /// Adds a strategy to the trader.
322    ///
323    /// Strategies are registered in both the component registry (for lifecycle management)
324    /// and the actor registry (for data callbacks via msgbus). The strategy's `StrategyCore`
325    /// is also registered with the portfolio for order management.
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if:
330    /// - The trader is not in a valid state for adding components.
331    /// - A strategy with the same ID is already registered.
332    pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
333    where
334        T: Strategy + Component + Debug + 'static,
335    {
336        self.validate_component_registration()?;
337
338        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
339
340        // Check for duplicate registration
341        if self.strategy_ids.contains(&strategy_id) {
342            anyhow::bail!("Strategy {strategy_id} is already registered");
343        }
344
345        let clock = self.create_component_clock();
346        let component_id = strategy.component_id();
347        self.clocks.insert(component_id, clock.clone());
348
349        // Register strategy core with portfolio for order management
350        strategy.core_mut().register(
351            self.trader_id,
352            clock.clone(),
353            self.cache.clone(),
354            self.portfolio.clone(),
355        )?;
356
357        // Register default time event handler for this strategy
358        let actor_id = strategy.actor_id().inner();
359        let callback = TimeEventCallback::from(move |event: TimeEvent| {
360            if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
361                actor.handle_time_event(&event);
362            } else {
363                log::error!("Strategy {actor_id} not found for time event handling");
364            }
365        });
366        clock.borrow_mut().register_default_handler(callback);
367
368        // Transition to Ready state
369        strategy.initialize()?;
370
371        // Register in both component and actor registries
372        register_component_actor(strategy);
373
374        let order_topic = get_event_orders_topic(strategy_id);
375        let order_actor_id = actor_id;
376        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
377            move |event: &OrderEventAny| {
378                if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
379                    strategy.handle_order_event(event.clone());
380                } else {
381                    log::error!("Strategy {order_actor_id} not found for order event handling");
382                }
383            },
384        )));
385        msgbus::subscribe_topic(order_topic, handler, None);
386
387        let position_topic = get_event_positions_topic(strategy_id);
388        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
389            move |event: &PositionEvent| {
390                if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
391                    strategy.handle_position_event(event.clone());
392                } else {
393                    log::error!("Strategy {actor_id} not found for position event handling");
394                }
395            },
396        )));
397        msgbus::subscribe_topic(position_topic, handler, None);
398
399        self.strategy_ids.push(strategy_id);
400
401        log::info!(
402            "Registered strategy {strategy_id} with trader {}",
403            self.trader_id
404        );
405
406        Ok(())
407    }
408
409    /// Adds an execution algorithm to the trader.
410    ///
411    /// Execution algorithms are registered in both the component registry (for lifecycle
412    /// management) and the actor registry (for data callbacks via msgbus).
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if:
417    /// - The trader is not in a valid state for adding components.
418    /// - An execution algorithm with the same ID is already registered.
419    pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
420    where
421        T: DataActor + Component + Debug + 'static,
422    {
423        self.validate_component_registration()?;
424
425        let exec_algorithm_id =
426            ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
427
428        // Check for duplicate registration
429        if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
430            anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
431        }
432
433        let clock = self.create_component_clock();
434        let component_id = exec_algorithm.component_id();
435        self.clocks.insert(component_id, clock.clone());
436
437        exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
438
439        // Register in both component and actor registries
440        register_component_actor(exec_algorithm);
441
442        self.exec_algorithm_ids.push(exec_algorithm_id);
443
444        log::info!(
445            "Registered execution algorithm {exec_algorithm_id} with trader {}",
446            self.trader_id
447        );
448
449        Ok(())
450    }
451
452    /// Validates that the trader is in a valid state for component registration.
453    fn validate_component_registration(&self) -> anyhow::Result<()> {
454        match self.state {
455            ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
456                Ok(())
457            }
458            ComponentState::Running => {
459                anyhow::bail!("Cannot add components while trader is running")
460            }
461            ComponentState::Disposed => {
462                anyhow::bail!("Cannot add components to disposed trader")
463            }
464            _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
465        }
466    }
467
468    /// Starts all registered components.
469    ///
470    /// # Errors
471    ///
472    /// Returns an error if any component fails to start.
473    pub fn start_components(&mut self) -> anyhow::Result<()> {
474        for actor_id in &self.actor_ids {
475            log::debug!("Starting actor {actor_id}");
476            start_component(&actor_id.inner())?;
477        }
478
479        for strategy_id in &self.strategy_ids {
480            log::debug!("Starting strategy {strategy_id}");
481            start_component(&strategy_id.inner())?;
482        }
483
484        for exec_algorithm_id in &self.exec_algorithm_ids {
485            log::debug!("Starting execution algorithm {exec_algorithm_id}");
486            start_component(&exec_algorithm_id.inner())?;
487        }
488
489        Ok(())
490    }
491
492    /// Stops all registered components.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if any component fails to stop.
497    pub fn stop_components(&mut self) -> anyhow::Result<()> {
498        for actor_id in &self.actor_ids {
499            log::debug!("Stopping actor {actor_id}");
500            stop_component(&actor_id.inner())?;
501        }
502
503        for exec_algorithm_id in &self.exec_algorithm_ids {
504            log::debug!("Stopping execution algorithm {exec_algorithm_id}");
505            stop_component(&exec_algorithm_id.inner())?;
506        }
507
508        for strategy_id in &self.strategy_ids {
509            log::debug!("Stopping strategy {strategy_id}");
510            stop_component(&strategy_id.inner())?;
511        }
512
513        Ok(())
514    }
515
516    /// Resets all registered components.
517    ///
518    /// # Errors
519    ///
520    /// Returns an error if any component fails to reset.
521    pub fn reset_components(&mut self) -> anyhow::Result<()> {
522        for actor_id in &self.actor_ids {
523            log::debug!("Resetting actor {actor_id}");
524            reset_component(&actor_id.inner())?;
525        }
526
527        for strategy_id in &self.strategy_ids {
528            log::debug!("Resetting strategy {strategy_id}");
529            reset_component(&strategy_id.inner())?;
530        }
531
532        for exec_algorithm_id in &self.exec_algorithm_ids {
533            log::debug!("Resetting execution algorithm {exec_algorithm_id}");
534            reset_component(&exec_algorithm_id.inner())?;
535        }
536
537        Ok(())
538    }
539
540    /// Disposes of all registered components.
541    ///
542    /// # Errors
543    ///
544    /// Returns an error if any component fails to dispose.
545    pub fn dispose_components(&mut self) -> anyhow::Result<()> {
546        for actor_id in &self.actor_ids {
547            log::debug!("Disposing actor {actor_id}");
548            dispose_component(&actor_id.inner())?;
549        }
550
551        for strategy_id in &self.strategy_ids {
552            log::debug!("Disposing strategy {strategy_id}");
553            dispose_component(&strategy_id.inner())?;
554        }
555
556        for exec_algorithm_id in &self.exec_algorithm_ids {
557            log::debug!("Disposing execution algorithm {exec_algorithm_id}");
558            dispose_component(&exec_algorithm_id.inner())?;
559        }
560
561        self.actor_ids.clear();
562        self.strategy_ids.clear();
563        self.exec_algorithm_ids.clear();
564        self.clocks.clear();
565
566        Ok(())
567    }
568
569    /// Initializes the trader, transitioning from `PreInitialized` to `Ready` state.
570    ///
571    /// This method must be called before starting the trader.
572    ///
573    /// # Errors
574    ///
575    /// Returns an error if the trader cannot be initialized from its current state.
576    pub fn initialize(&mut self) -> anyhow::Result<()> {
577        let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
578        self.state = new_state;
579
580        Ok(())
581    }
582
583    fn on_start(&mut self) -> anyhow::Result<()> {
584        self.start_components()?;
585
586        // Transition to running state
587        self.ts_started = Some(self.clock.borrow().timestamp_ns());
588
589        Ok(())
590    }
591
592    fn on_stop(&mut self) -> anyhow::Result<()> {
593        self.stop_components()?;
594
595        self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
596
597        Ok(())
598    }
599
600    fn on_reset(&mut self) -> anyhow::Result<()> {
601        self.reset_components()?;
602
603        self.ts_started = None;
604        self.ts_stopped = None;
605
606        Ok(())
607    }
608
609    fn on_dispose(&mut self) -> anyhow::Result<()> {
610        if self.is_running() {
611            self.stop()?;
612        }
613
614        self.dispose_components()?;
615
616        Ok(())
617    }
618}
619
620impl Component for Trader {
621    fn component_id(&self) -> ComponentId {
622        ComponentId::new(format!("Trader-{}", self.trader_id))
623    }
624
625    fn state(&self) -> ComponentState {
626        self.state
627    }
628
629    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
630        self.state = self.state.transition(&trigger)?;
631        log::info!("{}", self.state.variant_name());
632        Ok(())
633    }
634
635    fn register(
636        &mut self,
637        _trader_id: TraderId,
638        _clock: Rc<RefCell<dyn Clock>>,
639        _cache: Rc<RefCell<Cache>>,
640    ) -> anyhow::Result<()> {
641        anyhow::bail!("Trader cannot register with itself")
642    }
643
644    fn on_start(&mut self) -> anyhow::Result<()> {
645        Self::on_start(self)
646    }
647
648    fn on_stop(&mut self) -> anyhow::Result<()> {
649        Self::on_stop(self)
650    }
651
652    fn on_reset(&mut self) -> anyhow::Result<()> {
653        Self::on_reset(self)
654    }
655
656    fn on_dispose(&mut self) -> anyhow::Result<()> {
657        Self::on_dispose(self)
658    }
659}
660
661#[cfg(test)]
662mod tests {
663    use std::{
664        cell::RefCell,
665        ops::{Deref, DerefMut},
666        rc::Rc,
667    };
668
669    use nautilus_common::{
670        actor::{DataActorCore, data_actor::DataActorConfig},
671        cache::Cache,
672        clock::TestClock,
673        enums::{ComponentState, Environment},
674        msgbus::MessageBus,
675    };
676    use nautilus_core::UUID4;
677    use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
678    use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
679    use nautilus_model::{
680        identifiers::{ActorId, ComponentId, TraderId},
681        stubs::TestDefault,
682    };
683    use nautilus_portfolio::portfolio::Portfolio;
684    use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
685    use nautilus_trading::strategy::{
686        Strategy as StrategyTrait, config::StrategyConfig, core::StrategyCore,
687    };
688    use rstest::rstest;
689
690    use super::*;
691
692    // Simple DataActor wrapper for testing
693    #[derive(Debug)]
694    struct TestDataActor {
695        core: DataActorCore,
696    }
697
698    impl TestDataActor {
699        fn new(config: DataActorConfig) -> Self {
700            Self {
701                core: DataActorCore::new(config),
702            }
703        }
704    }
705
706    impl DataActor for TestDataActor {}
707
708    impl Deref for TestDataActor {
709        type Target = DataActorCore;
710        fn deref(&self) -> &Self::Target {
711            &self.core
712        }
713    }
714
715    impl DerefMut for TestDataActor {
716        fn deref_mut(&mut self) -> &mut Self::Target {
717            &mut self.core
718        }
719    }
720
721    // Simple Strategy wrapper for testing
722    #[derive(Debug)]
723    struct TestStrategy {
724        core: StrategyCore,
725    }
726
727    impl TestStrategy {
728        fn new(config: StrategyConfig) -> Self {
729            Self {
730                core: StrategyCore::new(config),
731            }
732        }
733    }
734
735    impl DataActor for TestStrategy {}
736
737    // Deref through StrategyCore to DataActorCore
738    impl Deref for TestStrategy {
739        type Target = DataActorCore;
740        fn deref(&self) -> &Self::Target {
741            &self.core
742        }
743    }
744
745    impl DerefMut for TestStrategy {
746        fn deref_mut(&mut self) -> &mut Self::Target {
747            &mut self.core
748        }
749    }
750
751    impl StrategyTrait for TestStrategy {
752        fn core_mut(&mut self) -> &mut StrategyCore {
753            &mut self.core
754        }
755    }
756
757    #[allow(clippy::type_complexity)]
758    fn create_trader_components() -> (
759        Rc<RefCell<MessageBus>>,
760        Rc<RefCell<Cache>>,
761        Rc<RefCell<Portfolio>>,
762        Rc<RefCell<DataEngine>>,
763        Rc<RefCell<RiskEngine>>,
764        Rc<RefCell<ExecutionEngine>>,
765        Rc<RefCell<TestClock>>,
766    ) {
767        let trader_id = TraderId::test_default();
768        let instance_id = UUID4::new();
769        let clock = Rc::new(RefCell::new(TestClock::new()));
770        // Set the clock to a non-zero time for test purposes
771        clock.borrow_mut().set_time(1_000_000_000u64.into());
772        let msgbus = Rc::new(RefCell::new(MessageBus::new(
773            trader_id,
774            instance_id,
775            Some("test".to_string()),
776            None,
777        )));
778        let cache = Rc::new(RefCell::new(Cache::new(None, None)));
779        let portfolio = Rc::new(RefCell::new(Portfolio::new(
780            cache.clone(),
781            clock.clone() as Rc<RefCell<dyn Clock>>,
782            None,
783        )));
784        let data_engine = Rc::new(RefCell::new(DataEngine::new(
785            clock.clone(),
786            cache.clone(),
787            Some(DataEngineConfig::default()),
788        )));
789
790        // Create separate cache and clock instances for RiskEngine to avoid borrowing conflicts
791        let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
792        let risk_clock = Rc::new(RefCell::new(TestClock::new()));
793        let risk_portfolio = Portfolio::new(
794            risk_cache.clone(),
795            risk_clock.clone() as Rc<RefCell<dyn Clock>>,
796            None,
797        );
798        let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
799            RiskEngineConfig::default(),
800            risk_portfolio,
801            risk_clock as Rc<RefCell<dyn Clock>>,
802            risk_cache,
803        )));
804        let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
805            clock.clone(),
806            cache.clone(),
807            Some(ExecutionEngineConfig::default()),
808        )));
809
810        (
811            msgbus,
812            cache,
813            portfolio,
814            data_engine,
815            risk_engine,
816            exec_engine,
817            clock,
818        )
819    }
820
821    #[rstest]
822    fn test_trader_creation() {
823        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
824            create_trader_components();
825        let trader_id = TraderId::test_default();
826        let instance_id = UUID4::new();
827
828        let trader = Trader::new(
829            trader_id,
830            instance_id,
831            Environment::Backtest,
832            clock,
833            cache,
834            portfolio,
835        );
836
837        assert_eq!(trader.trader_id(), trader_id);
838        assert_eq!(trader.instance_id(), instance_id);
839        assert_eq!(trader.environment(), Environment::Backtest);
840        assert_eq!(trader.state(), ComponentState::PreInitialized);
841        assert_eq!(trader.actor_count(), 0);
842        assert_eq!(trader.strategy_count(), 0);
843        assert_eq!(trader.exec_algorithm_count(), 0);
844        assert_eq!(trader.component_count(), 0);
845        assert!(!trader.is_running());
846        assert!(!trader.is_stopped());
847        assert!(!trader.is_disposed());
848        assert!(trader.ts_created() > 0);
849        assert!(trader.ts_started().is_none());
850        assert!(trader.ts_stopped().is_none());
851    }
852
853    #[rstest]
854    fn test_trader_component_id() {
855        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
856            create_trader_components();
857        let trader_id = TraderId::from("TRADER-001");
858        let instance_id = UUID4::new();
859
860        let trader = Trader::new(
861            trader_id,
862            instance_id,
863            Environment::Backtest,
864            clock,
865            cache,
866            portfolio,
867        );
868
869        assert_eq!(
870            trader.component_id(),
871            ComponentId::from("Trader-TRADER-001")
872        );
873    }
874
875    #[rstest]
876    fn test_add_actor_success() {
877        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
878            create_trader_components();
879        let trader_id = TraderId::test_default();
880        let instance_id = UUID4::new();
881
882        let mut trader = Trader::new(
883            trader_id,
884            instance_id,
885            Environment::Backtest,
886            clock,
887            cache,
888            portfolio,
889        );
890
891        let actor = TestDataActor::new(DataActorConfig::default());
892        let actor_id = actor.actor_id();
893
894        let result = trader.add_actor(actor);
895        assert!(result.is_ok());
896        assert_eq!(trader.actor_count(), 1);
897        assert_eq!(trader.component_count(), 1);
898        assert!(trader.actor_ids().contains(&actor_id));
899    }
900
901    #[rstest]
902    fn test_add_duplicate_actor_fails() {
903        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
904            create_trader_components();
905        let trader_id = TraderId::test_default();
906        let instance_id = UUID4::new();
907
908        let mut trader = Trader::new(
909            trader_id,
910            instance_id,
911            Environment::Backtest,
912            clock,
913            cache,
914            portfolio,
915        );
916
917        let config = DataActorConfig {
918            actor_id: Some(ActorId::from("TestActor")),
919            ..Default::default()
920        };
921        let actor1 = TestDataActor::new(config.clone());
922        let actor2 = TestDataActor::new(config);
923
924        // First addition should succeed
925        assert!(trader.add_actor(actor1).is_ok());
926        assert_eq!(trader.actor_count(), 1);
927
928        // Second addition should fail
929        let result = trader.add_actor(actor2);
930        assert!(result.is_err());
931        assert!(
932            result
933                .unwrap_err()
934                .to_string()
935                .contains("already registered")
936        );
937        assert_eq!(trader.actor_count(), 1);
938    }
939
940    #[rstest]
941    fn test_add_strategy_success() {
942        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
943            create_trader_components();
944        let trader_id = TraderId::test_default();
945        let instance_id = UUID4::new();
946
947        let mut trader = Trader::new(
948            trader_id,
949            instance_id,
950            Environment::Backtest,
951            clock,
952            cache,
953            portfolio,
954        );
955
956        let config = StrategyConfig {
957            strategy_id: Some(StrategyId::from("Test-Strategy")),
958            ..Default::default()
959        };
960        let strategy = TestStrategy::new(config);
961        let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
962
963        let result = trader.add_strategy(strategy);
964        assert!(result.is_ok());
965        assert_eq!(trader.strategy_count(), 1);
966        assert_eq!(trader.component_count(), 1);
967        assert!(trader.strategy_ids().contains(&strategy_id));
968    }
969
970    #[rstest]
971    fn test_add_exec_algorithm_success() {
972        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
973            create_trader_components();
974        let trader_id = TraderId::test_default();
975        let instance_id = UUID4::new();
976
977        let mut trader = Trader::new(
978            trader_id,
979            instance_id,
980            Environment::Backtest,
981            clock,
982            cache,
983            portfolio,
984        );
985
986        let config = DataActorConfig {
987            actor_id: Some(ActorId::from("TestExecAlgorithm")),
988            ..Default::default()
989        };
990        let exec_algorithm = TestDataActor::new(config);
991        let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
992
993        let result = trader.add_exec_algorithm(exec_algorithm);
994        assert!(result.is_ok());
995        assert_eq!(trader.exec_algorithm_count(), 1);
996        assert_eq!(trader.component_count(), 1);
997        assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
998    }
999
1000    #[rstest]
1001    fn test_component_lifecycle() {
1002        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1003            create_trader_components();
1004        let trader_id = TraderId::test_default();
1005        let instance_id = UUID4::new();
1006
1007        let mut trader = Trader::new(
1008            trader_id,
1009            instance_id,
1010            Environment::Backtest,
1011            clock,
1012            cache,
1013            portfolio,
1014        );
1015
1016        // Add components
1017        let actor = TestDataActor::new(DataActorConfig::default());
1018
1019        let strategy_config = StrategyConfig {
1020            strategy_id: Some(StrategyId::from("Test-Strategy")),
1021            ..Default::default()
1022        };
1023        let strategy = TestStrategy::new(strategy_config);
1024
1025        let exec_algorithm_config = DataActorConfig {
1026            actor_id: Some(ActorId::from("TestExecAlgorithm")),
1027            ..Default::default()
1028        };
1029        let exec_algorithm = TestDataActor::new(exec_algorithm_config);
1030
1031        assert!(trader.add_actor(actor).is_ok());
1032        assert!(trader.add_strategy(strategy).is_ok());
1033        assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1034        assert_eq!(trader.component_count(), 3);
1035
1036        // Test start components
1037        let start_result = trader.start_components();
1038        assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1039
1040        // Test stop components
1041        assert!(trader.stop_components().is_ok());
1042
1043        // Test reset components
1044        assert!(trader.reset_components().is_ok());
1045
1046        // Test dispose components
1047        assert!(trader.dispose_components().is_ok());
1048        assert_eq!(trader.component_count(), 0);
1049    }
1050
1051    #[rstest]
1052    fn test_trader_component_lifecycle() {
1053        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1054            create_trader_components();
1055        let trader_id = TraderId::test_default();
1056        let instance_id = UUID4::new();
1057
1058        let mut trader = Trader::new(
1059            trader_id,
1060            instance_id,
1061            Environment::Backtest,
1062            clock,
1063            cache,
1064            portfolio,
1065        );
1066
1067        // Initially pre-initialized
1068        assert_eq!(trader.state(), ComponentState::PreInitialized);
1069        assert!(!trader.is_running());
1070        assert!(!trader.is_stopped());
1071        assert!(!trader.is_disposed());
1072
1073        // Cannot start from pre-initialized state
1074        assert!(trader.start().is_err());
1075
1076        // Simulate initialization (normally done by kernel)
1077        trader.initialize().unwrap();
1078
1079        // Test start
1080        assert!(trader.start().is_ok());
1081        assert_eq!(trader.state(), ComponentState::Running);
1082        assert!(trader.is_running());
1083        assert!(trader.ts_started().is_some());
1084
1085        // Test stop
1086        assert!(trader.stop().is_ok());
1087        assert_eq!(trader.state(), ComponentState::Stopped);
1088        assert!(trader.is_stopped());
1089        assert!(trader.ts_stopped().is_some());
1090
1091        // Test reset
1092        assert!(trader.reset().is_ok());
1093        assert_eq!(trader.state(), ComponentState::Ready);
1094        assert!(trader.ts_started().is_none());
1095        assert!(trader.ts_stopped().is_none());
1096
1097        // Test dispose
1098        assert!(trader.dispose().is_ok());
1099        assert_eq!(trader.state(), ComponentState::Disposed);
1100        assert!(trader.is_disposed());
1101    }
1102
1103    #[rstest]
1104    fn test_cannot_add_components_while_running() {
1105        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1106            create_trader_components();
1107        let trader_id = TraderId::test_default();
1108        let instance_id = UUID4::new();
1109
1110        let mut trader = Trader::new(
1111            trader_id,
1112            instance_id,
1113            Environment::Backtest,
1114            clock,
1115            cache,
1116            portfolio,
1117        );
1118
1119        // Simulate running state
1120        trader.state = ComponentState::Running;
1121
1122        let actor = TestDataActor::new(DataActorConfig::default());
1123        let result = trader.add_actor(actor);
1124        assert!(result.is_err());
1125        assert!(
1126            result
1127                .unwrap_err()
1128                .to_string()
1129                .contains("while trader is running")
1130        );
1131    }
1132
1133    #[rstest]
1134    fn test_create_component_clock_backtest_vs_live() {
1135        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1136            create_trader_components();
1137        let trader_id = TraderId::test_default();
1138        let instance_id = UUID4::new();
1139
1140        // Test backtest environment - should create individual test clocks
1141        let trader_backtest = Trader::new(
1142            trader_id,
1143            instance_id,
1144            Environment::Backtest,
1145            clock.clone(),
1146            cache.clone(),
1147            portfolio.clone(),
1148        );
1149
1150        let backtest_clock = trader_backtest.create_component_clock();
1151        // In backtest, component clock should be different from system clock
1152        assert_ne!(
1153            backtest_clock.as_ptr() as *const _,
1154            clock.as_ptr() as *const _
1155        );
1156
1157        // Test live environment - should share system clock
1158        let trader_live = Trader::new(
1159            trader_id,
1160            instance_id,
1161            Environment::Live,
1162            clock.clone(),
1163            cache,
1164            portfolio,
1165        );
1166
1167        let live_clock = trader_live.create_component_clock();
1168        // In live, component clock should be same as system clock
1169        assert_eq!(live_clock.as_ptr() as *const _, clock.as_ptr() as *const _);
1170    }
1171}