Skip to main content

nautilus_system/
trader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Central orchestrator for managing actors, strategies, and execution algorithms.
17//!
18//! The `Trader` component serves as the primary coordination layer between the kernel
19//! and individual trading components. It manages component lifecycles, provides
20//! unique identification, and coordinates with system engines.
21
22use std::{cell::RefCell, fmt::Debug, rc::Rc};
23
24use ahash::AHashMap;
25use nautilus_common::{
26    actor::{DataActor, registry::try_get_actor_unchecked},
27    cache::Cache,
28    clock::{Clock, TestClock},
29    component::{
30        Component, dispose_component, register_component_actor, reset_component, start_component,
31        stop_component,
32    },
33    enums::{ComponentState, ComponentTrigger, Environment},
34    msgbus,
35    msgbus::{
36        TypedHandler,
37        switchboard::{get_event_orders_topic, get_event_positions_topic},
38    },
39    timer::{TimeEvent, TimeEventCallback},
40};
41use nautilus_core::{UUID4, UnixNanos};
42use nautilus_model::{
43    events::{OrderEventAny, PositionEvent},
44    identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
45};
46use nautilus_portfolio::portfolio::Portfolio;
47use nautilus_trading::strategy::Strategy;
48use ustr::Ustr;
49
50/// Central orchestrator for managing trading components.
51///
52/// The `Trader` manages the lifecycle and coordination of actors, strategies,
53/// and execution algorithms within the trading system. It provides component
54/// registration, state management, and integration with system engines.
55///
56/// # Notes
57///
58/// Strategies implement `Strategy::stop() -> bool` which returns whether to proceed
59/// with the component stop. This enables `manage_stop` behavior where the strategy
60/// can defer stopping until a market exit completes.
61///
62/// We store type-erased closures because the component registry stores trait objects
63/// and we need to call `Strategy::stop()` which requires the concrete type. The
64/// closure is created during `add_strategy` when the concrete type `T` is known.
65pub struct Trader {
66    /// The unique trader identifier.
67    pub trader_id: TraderId,
68    /// The unique instance identifier.
69    pub instance_id: UUID4,
70    /// The trading environment context.
71    pub environment: Environment,
72    /// Component state for lifecycle management.
73    state: ComponentState,
74    /// System clock for timestamping.
75    clock: Rc<RefCell<dyn Clock>>,
76    /// System cache for data storage.
77    cache: Rc<RefCell<Cache>>,
78    /// Portfolio reference for strategy registration.
79    portfolio: Rc<RefCell<Portfolio>>,
80    /// Registered actor IDs (actors stored in global registry).
81    actor_ids: Vec<ActorId>,
82    /// Registered strategy IDs (strategies stored in global registry).
83    strategy_ids: Vec<StrategyId>,
84    /// Strategy stop functions for managed stop behavior.
85    strategy_stop_fns: AHashMap<StrategyId, Box<dyn FnMut() -> bool>>,
86    /// Msgbus handler IDs for strategy event subscriptions (order, position).
87    strategy_handler_ids: AHashMap<StrategyId, (Ustr, Ustr)>,
88    /// Registered exec algorithm IDs (algorithms stored in global registry).
89    exec_algorithm_ids: Vec<ExecAlgorithmId>,
90    /// Component clocks for individual components.
91    clocks: AHashMap<ComponentId, Rc<RefCell<dyn Clock>>>,
92    /// Timestamp when the trader was created.
93    ts_created: UnixNanos,
94    /// Timestamp when the trader was last started.
95    ts_started: Option<UnixNanos>,
96    /// Timestamp when the trader was last stopped.
97    ts_stopped: Option<UnixNanos>,
98}
99
100impl Debug for Trader {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(f, "{:?}", stringify!(TraderId)) // TODO
103    }
104}
105
106impl Trader {
107    /// Creates a new [`Trader`] instance.
108    #[must_use]
109    pub fn new(
110        trader_id: TraderId,
111        instance_id: UUID4,
112        environment: Environment,
113        clock: Rc<RefCell<dyn Clock>>,
114        cache: Rc<RefCell<Cache>>,
115        portfolio: Rc<RefCell<Portfolio>>,
116    ) -> Self {
117        let ts_created = clock.borrow().timestamp_ns();
118
119        Self {
120            trader_id,
121            instance_id,
122            environment,
123            state: ComponentState::PreInitialized,
124            clock,
125            cache,
126            portfolio,
127            actor_ids: Vec::new(),
128            strategy_ids: Vec::new(),
129            strategy_stop_fns: AHashMap::new(),
130            strategy_handler_ids: AHashMap::new(),
131            exec_algorithm_ids: Vec::new(),
132            clocks: AHashMap::new(),
133            ts_created,
134            ts_started: None,
135            ts_stopped: None,
136        }
137    }
138
139    /// Returns the trader ID.
140    #[must_use]
141    pub const fn trader_id(&self) -> TraderId {
142        self.trader_id
143    }
144
145    /// Returns the instance ID.
146    #[must_use]
147    pub const fn instance_id(&self) -> UUID4 {
148        self.instance_id
149    }
150
151    /// Returns the trading environment.
152    #[must_use]
153    pub const fn environment(&self) -> Environment {
154        self.environment
155    }
156
157    /// Returns the current component state.
158    #[must_use]
159    pub const fn state(&self) -> ComponentState {
160        self.state
161    }
162
163    /// Returns the timestamp when the trader was created (UNIX nanoseconds).
164    #[must_use]
165    pub const fn ts_created(&self) -> UnixNanos {
166        self.ts_created
167    }
168
169    /// Returns the timestamp when the trader was last started (UNIX nanoseconds).
170    #[must_use]
171    pub const fn ts_started(&self) -> Option<UnixNanos> {
172        self.ts_started
173    }
174
175    /// Returns the timestamp when the trader was last stopped (UNIX nanoseconds).
176    #[must_use]
177    pub const fn ts_stopped(&self) -> Option<UnixNanos> {
178        self.ts_stopped
179    }
180
181    /// Returns the number of registered actors.
182    #[must_use]
183    pub const fn actor_count(&self) -> usize {
184        self.actor_ids.len()
185    }
186
187    /// Returns the number of registered strategies.
188    #[must_use]
189    pub const fn strategy_count(&self) -> usize {
190        self.strategy_ids.len()
191    }
192
193    /// Returns the number of registered execution algorithms.
194    #[must_use]
195    pub const fn exec_algorithm_count(&self) -> usize {
196        self.exec_algorithm_ids.len()
197    }
198
199    /// Returns references to all component clocks for backtest time advancement.
200    pub fn get_component_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
201        self.clocks.values().cloned().collect()
202    }
203
204    /// Returns the total number of registered components.
205    #[must_use]
206    pub const fn component_count(&self) -> usize {
207        self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
208    }
209
210    /// Returns a list of all registered actor IDs.
211    #[must_use]
212    pub fn actor_ids(&self) -> Vec<ActorId> {
213        self.actor_ids.clone()
214    }
215
216    /// Returns a list of all registered strategy IDs.
217    #[must_use]
218    pub fn strategy_ids(&self) -> Vec<StrategyId> {
219        self.strategy_ids.clone()
220    }
221
222    /// Returns a list of all registered execution algorithm IDs.
223    #[must_use]
224    pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
225        self.exec_algorithm_ids.clone()
226    }
227
228    /// Creates a clock for a component.
229    ///
230    /// Creates a test clock in backtest environment, otherwise returns a reference
231    /// to the system clock.
232    fn create_component_clock(&self) -> Rc<RefCell<dyn Clock>> {
233        match self.environment {
234            Environment::Backtest => {
235                // Create individual test clock for component in backtest
236                Rc::new(RefCell::new(TestClock::new()))
237            }
238            Environment::Live | Environment::Sandbox => {
239                // Share system clock in live environments
240                self.clock.clone()
241            }
242        }
243    }
244
245    /// Adds an actor to the trader.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if:
250    /// - The trader is not in a valid state for adding components.
251    /// - An actor with the same ID is already registered.
252    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
253    where
254        T: DataActor + Component + Debug + 'static,
255    {
256        self.validate_component_registration()?;
257
258        let actor_id = actor.actor_id();
259
260        // Check for duplicate registration
261        if self.actor_ids.contains(&actor_id) {
262            anyhow::bail!("Actor {actor_id} is already registered");
263        }
264
265        let clock = self.create_component_clock();
266        let component_id = ComponentId::new(actor_id.inner().as_str());
267        self.clocks.insert(component_id, clock.clone());
268
269        let mut actor_mut = actor;
270        actor_mut.register(self.trader_id, clock, self.cache.clone())?;
271
272        self.add_registered_actor(actor_mut)
273    }
274
275    /// Adds an actor to the trader using a factory function.
276    ///
277    /// The factory function is called at registration time to create the actor,
278    /// avoiding cloning issues with non-cloneable actor types.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if:
283    /// - The factory function fails to create the actor.
284    /// - The trader is not in a valid state for adding components.
285    /// - An actor with the same ID is already registered.
286    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
287    where
288        F: FnOnce() -> anyhow::Result<T>,
289        T: DataActor + Component + Debug + 'static,
290    {
291        let actor = factory()?;
292
293        self.add_actor(actor)
294    }
295
296    /// Adds an already registered actor to the trader's component registry.
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if the actor cannot be registered in the component registry.
301    pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
302    where
303        T: DataActor + Component + Debug + 'static,
304    {
305        let actor_id = actor.actor_id();
306
307        // Register in both component and actor registries (this consumes the actor)
308        register_component_actor(actor);
309
310        // Store actor ID for lifecycle management
311        self.actor_ids.push(actor_id);
312
313        log::info!("Registered actor {actor_id} with trader {}", self.trader_id);
314
315        Ok(())
316    }
317
318    /// Adds an actor ID to the trader's lifecycle management without consuming the actor.
319    ///
320    /// This is useful when the actor is already registered in the global component registry
321    /// but the trader needs to track it for lifecycle management. The caller is responsible
322    /// for ensuring the actor is properly registered in the global registries.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the actor ID is already tracked by this trader.
327    pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
328        // Check for duplicate registration
329        if self.actor_ids.contains(&actor_id) {
330            anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
331        }
332
333        // Store actor ID for lifecycle management
334        self.actor_ids.push(actor_id);
335
336        log::debug!(
337            "Added actor ID '{actor_id}' to trader {} for lifecycle management",
338            self.trader_id
339        );
340
341        Ok(())
342    }
343
344    /// Adds a strategy to the trader.
345    ///
346    /// Strategies are registered in both the component registry (for lifecycle management)
347    /// and the actor registry (for data callbacks via msgbus). The strategy's `StrategyCore`
348    /// is also registered with the portfolio for order management.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if:
353    /// - The trader is not in a valid state for adding components.
354    /// - A strategy with the same ID is already registered.
355    pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
356    where
357        T: Strategy + Component + Debug + 'static,
358    {
359        self.validate_component_registration()?;
360
361        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
362
363        // Check for duplicate registration
364        if self.strategy_ids.contains(&strategy_id) {
365            anyhow::bail!("Strategy {strategy_id} is already registered");
366        }
367
368        let clock = self.create_component_clock();
369        let component_id = strategy.component_id();
370        self.clocks.insert(component_id, clock.clone());
371
372        // Register strategy core with portfolio for order management
373        strategy.core_mut().register(
374            self.trader_id,
375            clock.clone(),
376            self.cache.clone(),
377            self.portfolio.clone(),
378        )?;
379
380        // Register default time event handler for this strategy
381        let actor_id = strategy.actor_id().inner();
382        let callback = TimeEventCallback::from(move |event: TimeEvent| {
383            if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
384                actor.handle_time_event(&event);
385            } else {
386                log::error!("Strategy {actor_id} not found for time event handling");
387            }
388        });
389        clock.borrow_mut().register_default_handler(callback);
390
391        // Transition to Ready state
392        strategy.initialize()?;
393
394        // Register in both component and actor registries
395        register_component_actor(strategy);
396
397        let order_topic = get_event_orders_topic(strategy_id);
398        let order_actor_id = actor_id;
399        let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
400            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
401                strategy.handle_order_event(event.clone());
402            } else {
403                log::error!("Strategy {order_actor_id} not found for order event handling");
404            }
405        });
406        let order_handler_id = order_handler.id();
407        msgbus::subscribe_order_events(order_topic.into(), order_handler, None);
408
409        let position_topic = get_event_positions_topic(strategy_id);
410        let position_handler = TypedHandler::from(move |event: &PositionEvent| {
411            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
412                strategy.handle_position_event(event.clone());
413            } else {
414                log::error!("Strategy {actor_id} not found for position event handling");
415            }
416        });
417        let position_handler_id = position_handler.id();
418        msgbus::subscribe_position_events(position_topic.into(), position_handler, None);
419
420        self.strategy_ids.push(strategy_id);
421        self.strategy_handler_ids
422            .insert(strategy_id, (order_handler_id, position_handler_id));
423
424        let stop_actor_id = actor_id;
425        let stop_fn = Box::new(move || -> bool {
426            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&stop_actor_id) {
427                Strategy::stop(&mut *strategy)
428            } else {
429                log::error!("Strategy {stop_actor_id} not found for stop");
430                true // Proceed with component stop anyway
431            }
432        });
433        self.strategy_stop_fns.insert(strategy_id, stop_fn);
434
435        log::info!(
436            "Registered strategy {strategy_id} with trader {}",
437            self.trader_id
438        );
439
440        Ok(())
441    }
442
443    /// Adds an execution algorithm to the trader.
444    ///
445    /// Execution algorithms are registered in both the component registry (for lifecycle
446    /// management) and the actor registry (for data callbacks via msgbus).
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if:
451    /// - The trader is not in a valid state for adding components.
452    /// - An execution algorithm with the same ID is already registered.
453    pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
454    where
455        T: DataActor + Component + Debug + 'static,
456    {
457        self.validate_component_registration()?;
458
459        let exec_algorithm_id =
460            ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
461
462        // Check for duplicate registration
463        if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
464            anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
465        }
466
467        let clock = self.create_component_clock();
468        let component_id = exec_algorithm.component_id();
469        self.clocks.insert(component_id, clock.clone());
470
471        exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
472
473        // Register in both component and actor registries
474        register_component_actor(exec_algorithm);
475
476        self.exec_algorithm_ids.push(exec_algorithm_id);
477
478        log::info!(
479            "Registered execution algorithm {exec_algorithm_id} with trader {}",
480            self.trader_id
481        );
482
483        Ok(())
484    }
485
486    /// Validates that the trader is in a valid state for component registration.
487    fn validate_component_registration(&self) -> anyhow::Result<()> {
488        match self.state {
489            ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
490                Ok(())
491            }
492            ComponentState::Running => {
493                anyhow::bail!("Cannot add components while trader is running")
494            }
495            ComponentState::Disposed => {
496                anyhow::bail!("Cannot add components to disposed trader")
497            }
498            _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
499        }
500    }
501
502    /// Starts all registered components.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if any component fails to start.
507    pub fn start_components(&mut self) -> anyhow::Result<()> {
508        for actor_id in &self.actor_ids {
509            log::debug!("Starting actor {actor_id}");
510            start_component(&actor_id.inner())?;
511        }
512
513        for strategy_id in &self.strategy_ids {
514            log::debug!("Starting strategy {strategy_id}");
515            start_component(&strategy_id.inner())?;
516        }
517
518        for exec_algorithm_id in &self.exec_algorithm_ids {
519            log::debug!("Starting execution algorithm {exec_algorithm_id}");
520            start_component(&exec_algorithm_id.inner())?;
521        }
522
523        Ok(())
524    }
525
526    /// Stops all registered components.
527    ///
528    /// # Errors
529    ///
530    /// Returns an error if any component fails to stop.
531    pub fn stop_components(&mut self) -> anyhow::Result<()> {
532        for actor_id in &self.actor_ids {
533            log::debug!("Stopping actor {actor_id}");
534            stop_component(&actor_id.inner())?;
535        }
536
537        for exec_algorithm_id in &self.exec_algorithm_ids {
538            log::debug!("Stopping execution algorithm {exec_algorithm_id}");
539            stop_component(&exec_algorithm_id.inner())?;
540        }
541
542        for strategy_id in self.strategy_ids.clone() {
543            log::debug!("Stopping strategy {strategy_id}");
544            let should_proceed = self
545                .strategy_stop_fns
546                .get_mut(&strategy_id)
547                .is_none_or(|stop_fn| stop_fn());
548            if should_proceed {
549                stop_component(&strategy_id.inner())?;
550            }
551        }
552
553        Ok(())
554    }
555
556    /// Resets all registered components.
557    ///
558    /// # Errors
559    ///
560    /// Returns an error if any component fails to reset.
561    pub fn reset_components(&mut self) -> anyhow::Result<()> {
562        for actor_id in &self.actor_ids {
563            log::debug!("Resetting actor {actor_id}");
564            reset_component(&actor_id.inner())?;
565        }
566
567        for strategy_id in &self.strategy_ids {
568            log::debug!("Resetting strategy {strategy_id}");
569            reset_component(&strategy_id.inner())?;
570        }
571
572        for exec_algorithm_id in &self.exec_algorithm_ids {
573            log::debug!("Resetting execution algorithm {exec_algorithm_id}");
574            reset_component(&exec_algorithm_id.inner())?;
575        }
576
577        Ok(())
578    }
579
580    /// Disposes of all registered components.
581    ///
582    /// # Errors
583    ///
584    /// Returns an error if any component fails to dispose.
585    pub fn dispose_components(&mut self) -> anyhow::Result<()> {
586        for actor_id in &self.actor_ids {
587            log::debug!("Disposing actor {actor_id}");
588            dispose_component(&actor_id.inner())?;
589        }
590
591        for strategy_id in &self.strategy_ids {
592            log::debug!("Disposing strategy {strategy_id}");
593            dispose_component(&strategy_id.inner())?;
594        }
595
596        for exec_algorithm_id in &self.exec_algorithm_ids {
597            log::debug!("Disposing execution algorithm {exec_algorithm_id}");
598            dispose_component(&exec_algorithm_id.inner())?;
599        }
600
601        self.actor_ids.clear();
602        self.strategy_ids.clear();
603        self.exec_algorithm_ids.clear();
604        self.clocks.clear();
605
606        Ok(())
607    }
608
609    /// Clears all registered strategies, disposing each and removing their clocks.
610    ///
611    /// # Errors
612    ///
613    /// Returns an error if any strategy fails to dispose.
614    pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
615        for strategy_id in &self.strategy_ids {
616            log::debug!("Disposing strategy {strategy_id}");
617            dispose_component(&strategy_id.inner())?;
618            let component_id = ComponentId::new(strategy_id.inner().as_str());
619            self.clocks.remove(&component_id);
620
621            // Remove only this strategy's own msgbus handlers
622            if let Some((order_hid, position_hid)) = self.strategy_handler_ids.get(strategy_id) {
623                let order_topic = get_event_orders_topic(*strategy_id);
624                let position_topic = get_event_positions_topic(*strategy_id);
625                msgbus::remove_order_event_handler(order_topic.into(), *order_hid);
626                msgbus::remove_position_event_handler(position_topic.into(), *position_hid);
627            }
628        }
629
630        self.strategy_ids.clear();
631        self.strategy_stop_fns.clear();
632        self.strategy_handler_ids.clear();
633
634        Ok(())
635    }
636
637    /// Clears all registered execution algorithms, disposing each and removing their clocks.
638    ///
639    /// # Errors
640    ///
641    /// Returns an error if any execution algorithm fails to dispose.
642    pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
643        for exec_algorithm_id in &self.exec_algorithm_ids {
644            log::debug!("Disposing execution algorithm {exec_algorithm_id}");
645            dispose_component(&exec_algorithm_id.inner())?;
646            let component_id = ComponentId::new(exec_algorithm_id.inner().as_str());
647            self.clocks.remove(&component_id);
648        }
649
650        self.exec_algorithm_ids.clear();
651
652        Ok(())
653    }
654
655    /// Initializes the trader, transitioning from `PreInitialized` to `Ready` state.
656    ///
657    /// This method must be called before starting the trader.
658    ///
659    /// # Errors
660    ///
661    /// Returns an error if the trader cannot be initialized from its current state.
662    pub fn initialize(&mut self) -> anyhow::Result<()> {
663        let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
664        self.state = new_state;
665
666        Ok(())
667    }
668
669    fn on_start(&mut self) -> anyhow::Result<()> {
670        self.start_components()?;
671
672        // Transition to running state
673        self.ts_started = Some(self.clock.borrow().timestamp_ns());
674
675        Ok(())
676    }
677
678    fn on_stop(&mut self) -> anyhow::Result<()> {
679        self.stop_components()?;
680
681        self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
682
683        Ok(())
684    }
685
686    fn on_reset(&mut self) -> anyhow::Result<()> {
687        self.reset_components()?;
688
689        self.ts_started = None;
690        self.ts_stopped = None;
691
692        Ok(())
693    }
694
695    fn on_dispose(&mut self) -> anyhow::Result<()> {
696        if self.is_running() {
697            self.stop()?;
698        }
699
700        self.dispose_components()?;
701
702        Ok(())
703    }
704}
705
706impl Component for Trader {
707    fn component_id(&self) -> ComponentId {
708        ComponentId::new(format!("Trader-{}", self.trader_id))
709    }
710
711    fn state(&self) -> ComponentState {
712        self.state
713    }
714
715    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
716        self.state = self.state.transition(&trigger)?;
717        log::info!("{}", self.state.variant_name());
718        Ok(())
719    }
720
721    fn register(
722        &mut self,
723        _trader_id: TraderId,
724        _clock: Rc<RefCell<dyn Clock>>,
725        _cache: Rc<RefCell<Cache>>,
726    ) -> anyhow::Result<()> {
727        anyhow::bail!("Trader cannot register with itself")
728    }
729
730    fn on_start(&mut self) -> anyhow::Result<()> {
731        Self::on_start(self)
732    }
733
734    fn on_stop(&mut self) -> anyhow::Result<()> {
735        Self::on_stop(self)
736    }
737
738    fn on_reset(&mut self) -> anyhow::Result<()> {
739        Self::on_reset(self)
740    }
741
742    fn on_dispose(&mut self) -> anyhow::Result<()> {
743        Self::on_dispose(self)
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use std::{
750        cell::RefCell,
751        ops::{Deref, DerefMut},
752        rc::Rc,
753    };
754
755    use nautilus_common::{
756        actor::{DataActorCore, data_actor::DataActorConfig},
757        cache::Cache,
758        clock::TestClock,
759        enums::{ComponentState, Environment},
760        msgbus,
761        msgbus::{MessageBus, TypedHandler, switchboard::get_event_orders_topic},
762    };
763    use nautilus_core::UUID4;
764    use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
765    use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
766    use nautilus_model::{
767        events::OrderAccepted,
768        identifiers::{ActorId, ComponentId, TraderId},
769        stubs::TestDefault,
770    };
771    use nautilus_portfolio::portfolio::Portfolio;
772    use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
773    use nautilus_trading::strategy::{
774        Strategy as StrategyTrait, config::StrategyConfig, core::StrategyCore,
775    };
776    use rstest::rstest;
777
778    use super::*;
779
780    // Simple DataActor wrapper for testing
781    #[derive(Debug)]
782    struct TestDataActor {
783        core: DataActorCore,
784    }
785
786    impl TestDataActor {
787        fn new(config: DataActorConfig) -> Self {
788            Self {
789                core: DataActorCore::new(config),
790            }
791        }
792    }
793
794    impl DataActor for TestDataActor {}
795
796    impl Deref for TestDataActor {
797        type Target = DataActorCore;
798        fn deref(&self) -> &Self::Target {
799            &self.core
800        }
801    }
802
803    impl DerefMut for TestDataActor {
804        fn deref_mut(&mut self) -> &mut Self::Target {
805            &mut self.core
806        }
807    }
808
809    // Simple Strategy wrapper for testing
810    #[derive(Debug)]
811    struct TestStrategy {
812        core: StrategyCore,
813    }
814
815    impl TestStrategy {
816        fn new(config: StrategyConfig) -> Self {
817            Self {
818                core: StrategyCore::new(config),
819            }
820        }
821    }
822
823    impl DataActor for TestStrategy {}
824
825    impl Deref for TestStrategy {
826        type Target = DataActorCore;
827        fn deref(&self) -> &Self::Target {
828            &self.core
829        }
830    }
831
832    impl DerefMut for TestStrategy {
833        fn deref_mut(&mut self) -> &mut Self::Target {
834            &mut self.core
835        }
836    }
837
838    impl StrategyTrait for TestStrategy {
839        fn core(&self) -> &StrategyCore {
840            &self.core
841        }
842
843        fn core_mut(&mut self) -> &mut StrategyCore {
844            &mut self.core
845        }
846    }
847
848    #[allow(clippy::type_complexity)]
849    fn create_trader_components() -> (
850        Rc<RefCell<MessageBus>>,
851        Rc<RefCell<Cache>>,
852        Rc<RefCell<Portfolio>>,
853        Rc<RefCell<DataEngine>>,
854        Rc<RefCell<RiskEngine>>,
855        Rc<RefCell<ExecutionEngine>>,
856        Rc<RefCell<TestClock>>,
857    ) {
858        let trader_id = TraderId::test_default();
859        let instance_id = UUID4::new();
860        let clock = Rc::new(RefCell::new(TestClock::new()));
861        // Set the clock to a non-zero time for test purposes
862        clock.borrow_mut().set_time(1_000_000_000u64.into());
863        let msgbus = Rc::new(RefCell::new(MessageBus::new(
864            trader_id,
865            instance_id,
866            Some("test".to_string()),
867            None,
868        )));
869        let cache = Rc::new(RefCell::new(Cache::new(None, None)));
870        let portfolio = Rc::new(RefCell::new(Portfolio::new(
871            cache.clone(),
872            clock.clone() as Rc<RefCell<dyn Clock>>,
873            None,
874        )));
875        let data_engine = Rc::new(RefCell::new(DataEngine::new(
876            clock.clone(),
877            cache.clone(),
878            Some(DataEngineConfig::default()),
879        )));
880
881        // Create separate cache and clock instances for RiskEngine to avoid borrowing conflicts
882        let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
883        let risk_clock = Rc::new(RefCell::new(TestClock::new()));
884        let risk_portfolio = Portfolio::new(
885            risk_cache.clone(),
886            risk_clock.clone() as Rc<RefCell<dyn Clock>>,
887            None,
888        );
889        let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
890            RiskEngineConfig::default(),
891            risk_portfolio,
892            risk_clock as Rc<RefCell<dyn Clock>>,
893            risk_cache,
894        )));
895        let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
896            clock.clone(),
897            cache.clone(),
898            Some(ExecutionEngineConfig::default()),
899        )));
900
901        (
902            msgbus,
903            cache,
904            portfolio,
905            data_engine,
906            risk_engine,
907            exec_engine,
908            clock,
909        )
910    }
911
912    #[rstest]
913    fn test_trader_creation() {
914        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
915            create_trader_components();
916        let trader_id = TraderId::test_default();
917        let instance_id = UUID4::new();
918
919        let trader = Trader::new(
920            trader_id,
921            instance_id,
922            Environment::Backtest,
923            clock,
924            cache,
925            portfolio,
926        );
927
928        assert_eq!(trader.trader_id(), trader_id);
929        assert_eq!(trader.instance_id(), instance_id);
930        assert_eq!(trader.environment(), Environment::Backtest);
931        assert_eq!(trader.state(), ComponentState::PreInitialized);
932        assert_eq!(trader.actor_count(), 0);
933        assert_eq!(trader.strategy_count(), 0);
934        assert_eq!(trader.exec_algorithm_count(), 0);
935        assert_eq!(trader.component_count(), 0);
936        assert!(!trader.is_running());
937        assert!(!trader.is_stopped());
938        assert!(!trader.is_disposed());
939        assert!(trader.ts_created() > 0);
940        assert!(trader.ts_started().is_none());
941        assert!(trader.ts_stopped().is_none());
942    }
943
944    #[rstest]
945    fn test_trader_component_id() {
946        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
947            create_trader_components();
948        let trader_id = TraderId::from("TRADER-001");
949        let instance_id = UUID4::new();
950
951        let trader = Trader::new(
952            trader_id,
953            instance_id,
954            Environment::Backtest,
955            clock,
956            cache,
957            portfolio,
958        );
959
960        assert_eq!(
961            trader.component_id(),
962            ComponentId::from("Trader-TRADER-001")
963        );
964    }
965
966    #[rstest]
967    fn test_add_actor_success() {
968        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
969            create_trader_components();
970        let trader_id = TraderId::test_default();
971        let instance_id = UUID4::new();
972
973        let mut trader = Trader::new(
974            trader_id,
975            instance_id,
976            Environment::Backtest,
977            clock,
978            cache,
979            portfolio,
980        );
981
982        let actor = TestDataActor::new(DataActorConfig::default());
983        let actor_id = actor.actor_id();
984
985        let result = trader.add_actor(actor);
986        assert!(result.is_ok());
987        assert_eq!(trader.actor_count(), 1);
988        assert_eq!(trader.component_count(), 1);
989        assert!(trader.actor_ids().contains(&actor_id));
990    }
991
992    #[rstest]
993    fn test_add_duplicate_actor_fails() {
994        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
995            create_trader_components();
996        let trader_id = TraderId::test_default();
997        let instance_id = UUID4::new();
998
999        let mut trader = Trader::new(
1000            trader_id,
1001            instance_id,
1002            Environment::Backtest,
1003            clock,
1004            cache,
1005            portfolio,
1006        );
1007
1008        let config = DataActorConfig {
1009            actor_id: Some(ActorId::from("TestActor")),
1010            ..Default::default()
1011        };
1012        let actor1 = TestDataActor::new(config.clone());
1013        let actor2 = TestDataActor::new(config);
1014
1015        // First addition should succeed
1016        assert!(trader.add_actor(actor1).is_ok());
1017        assert_eq!(trader.actor_count(), 1);
1018
1019        // Second addition should fail
1020        let result = trader.add_actor(actor2);
1021        assert!(result.is_err());
1022        assert!(
1023            result
1024                .unwrap_err()
1025                .to_string()
1026                .contains("already registered")
1027        );
1028        assert_eq!(trader.actor_count(), 1);
1029    }
1030
1031    #[rstest]
1032    fn test_add_strategy_success() {
1033        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1034            create_trader_components();
1035        let trader_id = TraderId::test_default();
1036        let instance_id = UUID4::new();
1037
1038        let mut trader = Trader::new(
1039            trader_id,
1040            instance_id,
1041            Environment::Backtest,
1042            clock,
1043            cache,
1044            portfolio,
1045        );
1046
1047        let config = StrategyConfig {
1048            strategy_id: Some(StrategyId::from("Test-Strategy")),
1049            ..Default::default()
1050        };
1051        let strategy = TestStrategy::new(config);
1052        let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1053
1054        let result = trader.add_strategy(strategy);
1055        assert!(result.is_ok());
1056        assert_eq!(trader.strategy_count(), 1);
1057        assert_eq!(trader.component_count(), 1);
1058        assert!(trader.strategy_ids().contains(&strategy_id));
1059    }
1060
1061    #[rstest]
1062    fn test_add_exec_algorithm_success() {
1063        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1064            create_trader_components();
1065        let trader_id = TraderId::test_default();
1066        let instance_id = UUID4::new();
1067
1068        let mut trader = Trader::new(
1069            trader_id,
1070            instance_id,
1071            Environment::Backtest,
1072            clock,
1073            cache,
1074            portfolio,
1075        );
1076
1077        let config = DataActorConfig {
1078            actor_id: Some(ActorId::from("TestExecAlgorithm")),
1079            ..Default::default()
1080        };
1081        let exec_algorithm = TestDataActor::new(config);
1082        let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
1083
1084        let result = trader.add_exec_algorithm(exec_algorithm);
1085        assert!(result.is_ok());
1086        assert_eq!(trader.exec_algorithm_count(), 1);
1087        assert_eq!(trader.component_count(), 1);
1088        assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
1089    }
1090
1091    #[rstest]
1092    fn test_component_lifecycle() {
1093        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1094            create_trader_components();
1095        let trader_id = TraderId::test_default();
1096        let instance_id = UUID4::new();
1097
1098        let mut trader = Trader::new(
1099            trader_id,
1100            instance_id,
1101            Environment::Backtest,
1102            clock,
1103            cache,
1104            portfolio,
1105        );
1106
1107        // Add components
1108        let actor = TestDataActor::new(DataActorConfig::default());
1109
1110        let strategy_config = StrategyConfig {
1111            strategy_id: Some(StrategyId::from("Test-Strategy")),
1112            ..Default::default()
1113        };
1114        let strategy = TestStrategy::new(strategy_config);
1115
1116        let exec_algorithm_config = DataActorConfig {
1117            actor_id: Some(ActorId::from("TestExecAlgorithm")),
1118            ..Default::default()
1119        };
1120        let exec_algorithm = TestDataActor::new(exec_algorithm_config);
1121
1122        assert!(trader.add_actor(actor).is_ok());
1123        assert!(trader.add_strategy(strategy).is_ok());
1124        assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1125        assert_eq!(trader.component_count(), 3);
1126
1127        // Test start components
1128        let start_result = trader.start_components();
1129        assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1130
1131        // Test stop components
1132        assert!(trader.stop_components().is_ok());
1133
1134        // Test reset components
1135        assert!(trader.reset_components().is_ok());
1136
1137        // Test dispose components
1138        assert!(trader.dispose_components().is_ok());
1139        assert_eq!(trader.component_count(), 0);
1140    }
1141
1142    #[rstest]
1143    fn test_trader_component_lifecycle() {
1144        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1145            create_trader_components();
1146        let trader_id = TraderId::test_default();
1147        let instance_id = UUID4::new();
1148
1149        let mut trader = Trader::new(
1150            trader_id,
1151            instance_id,
1152            Environment::Backtest,
1153            clock,
1154            cache,
1155            portfolio,
1156        );
1157
1158        // Initially pre-initialized
1159        assert_eq!(trader.state(), ComponentState::PreInitialized);
1160        assert!(!trader.is_running());
1161        assert!(!trader.is_stopped());
1162        assert!(!trader.is_disposed());
1163
1164        // Cannot start from pre-initialized state
1165        assert!(trader.start().is_err());
1166
1167        // Simulate initialization (normally done by kernel)
1168        trader.initialize().unwrap();
1169
1170        // Test start
1171        assert!(trader.start().is_ok());
1172        assert_eq!(trader.state(), ComponentState::Running);
1173        assert!(trader.is_running());
1174        assert!(trader.ts_started().is_some());
1175
1176        // Test stop
1177        assert!(trader.stop().is_ok());
1178        assert_eq!(trader.state(), ComponentState::Stopped);
1179        assert!(trader.is_stopped());
1180        assert!(trader.ts_stopped().is_some());
1181
1182        // Test reset
1183        assert!(trader.reset().is_ok());
1184        assert_eq!(trader.state(), ComponentState::Ready);
1185        assert!(trader.ts_started().is_none());
1186        assert!(trader.ts_stopped().is_none());
1187
1188        // Test dispose
1189        assert!(trader.dispose().is_ok());
1190        assert_eq!(trader.state(), ComponentState::Disposed);
1191        assert!(trader.is_disposed());
1192    }
1193
1194    #[rstest]
1195    fn test_cannot_add_components_while_running() {
1196        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1197            create_trader_components();
1198        let trader_id = TraderId::test_default();
1199        let instance_id = UUID4::new();
1200
1201        let mut trader = Trader::new(
1202            trader_id,
1203            instance_id,
1204            Environment::Backtest,
1205            clock,
1206            cache,
1207            portfolio,
1208        );
1209
1210        // Simulate running state
1211        trader.state = ComponentState::Running;
1212
1213        let actor = TestDataActor::new(DataActorConfig::default());
1214        let result = trader.add_actor(actor);
1215        assert!(result.is_err());
1216        assert!(
1217            result
1218                .unwrap_err()
1219                .to_string()
1220                .contains("while trader is running")
1221        );
1222    }
1223
1224    #[rstest]
1225    fn test_create_component_clock_backtest_vs_live() {
1226        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1227            create_trader_components();
1228        let trader_id = TraderId::test_default();
1229        let instance_id = UUID4::new();
1230
1231        // Test backtest environment - should create individual test clocks
1232        let trader_backtest = Trader::new(
1233            trader_id,
1234            instance_id,
1235            Environment::Backtest,
1236            clock.clone(),
1237            cache.clone(),
1238            portfolio.clone(),
1239        );
1240
1241        let backtest_clock = trader_backtest.create_component_clock();
1242        // In backtest, component clock should be different from system clock
1243        assert_ne!(
1244            backtest_clock.as_ptr() as *const _,
1245            clock.as_ptr() as *const _
1246        );
1247
1248        // Test live environment - should share system clock
1249        let trader_live = Trader::new(
1250            trader_id,
1251            instance_id,
1252            Environment::Live,
1253            clock.clone(),
1254            cache,
1255            portfolio,
1256        );
1257
1258        let live_clock = trader_live.create_component_clock();
1259        // In live, component clock should be same as system clock
1260        assert_eq!(live_clock.as_ptr() as *const _, clock.as_ptr() as *const _);
1261    }
1262
1263    #[rstest]
1264    fn test_clear_strategies_preserves_other_handlers() {
1265        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1266            create_trader_components();
1267        let trader_id = TraderId::test_default();
1268        let instance_id = UUID4::new();
1269
1270        let mut trader = Trader::new(
1271            trader_id,
1272            instance_id,
1273            Environment::Backtest,
1274            clock,
1275            cache,
1276            portfolio,
1277        );
1278
1279        let config = StrategyConfig {
1280            strategy_id: Some(StrategyId::from("Test-Strategy")),
1281            ..Default::default()
1282        };
1283        let strategy = TestStrategy::new(config);
1284        let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1285        trader.add_strategy(strategy).unwrap();
1286
1287        // Simulate an exec algorithm subscribing to the same strategy topic
1288        let ext_received = Rc::new(RefCell::new(0));
1289        let ext_clone = ext_received.clone();
1290        let ext_handler =
1291            TypedHandler::from_with_id("exec-algo-handler", move |_: &OrderEventAny| {
1292                *ext_clone.borrow_mut() += 1;
1293            });
1294        let order_topic = get_event_orders_topic(strategy_id);
1295        msgbus::subscribe_order_events(order_topic.into(), ext_handler, None);
1296
1297        trader.clear_strategies().unwrap();
1298        assert_eq!(trader.strategy_count(), 0);
1299
1300        let event = OrderEventAny::Accepted(OrderAccepted::test_default());
1301        msgbus::publish_order_event(order_topic, &event);
1302        assert_eq!(*ext_received.borrow(), 1);
1303    }
1304}