Skip to main content

nautilus_trading/algorithm/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core component for execution algorithms.
17
18use std::{
19    cell::RefCell,
20    fmt::Debug,
21    ops::{Deref, DerefMut},
22    rc::Rc,
23};
24
25use ahash::{AHashMap, AHashSet};
26use nautilus_common::{
27    actor::{DataActorConfig, DataActorCore},
28    cache::Cache,
29    clock::Clock,
30    msgbus::TypedHandler,
31};
32use nautilus_model::{
33    events::{OrderEventAny, PositionEvent},
34    identifiers::{ActorId, ClientOrderId, ExecAlgorithmId, StrategyId, TraderId},
35    orders::{OrderAny, OrderList},
36    types::Quantity,
37};
38
39use super::config::ExecutionAlgorithmConfig;
40
41/// Holds event handlers for strategy event subscriptions.
42#[derive(Clone, Debug)]
43pub struct StrategyEventHandlers {
44    /// The topic string for order events.
45    pub order_topic: String,
46    /// The handler for order events.
47    pub order_handler: TypedHandler<OrderEventAny>,
48    /// The topic string for position events.
49    pub position_topic: String,
50    /// The handler for position events.
51    pub position_handler: TypedHandler<PositionEvent>,
52}
53
54/// The core component of an [`ExecutionAlgorithm`](super::ExecutionAlgorithm).
55///
56/// This struct manages the internal state for execution algorithms including
57/// spawn ID tracking and strategy subscriptions. It wraps a [`DataActorCore`]
58/// to provide data actor capabilities.
59///
60/// User algorithms should hold this as a member and implement `Deref`/`DerefMut`
61/// to satisfy the trait bounds of [`ExecutionAlgorithm`](super::ExecutionAlgorithm).
62pub struct ExecutionAlgorithmCore {
63    /// The underlying data actor core.
64    pub actor: DataActorCore,
65    /// The execution algorithm configuration.
66    pub config: ExecutionAlgorithmConfig,
67    /// The execution algorithm ID.
68    pub exec_algorithm_id: ExecAlgorithmId,
69    /// Maps primary order client IDs to their spawn sequence counter.
70    exec_spawn_ids: AHashMap<ClientOrderId, u32>,
71    /// Tracks strategies that have been subscribed to for events.
72    subscribed_strategies: AHashSet<StrategyId>,
73    /// Tracks pending spawn reductions for quantity restoration on denial/rejection.
74    pending_spawn_reductions: AHashMap<ClientOrderId, Quantity>,
75    /// Maps strategies to their event handlers for cleanup on reset.
76    strategy_event_handlers: AHashMap<StrategyId, StrategyEventHandlers>,
77}
78
79impl Debug for ExecutionAlgorithmCore {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct(stringify!(ExecutionAlgorithmCore))
82            .field("actor", &self.actor)
83            .field("config", &self.config)
84            .field("exec_algorithm_id", &self.exec_algorithm_id)
85            .field("exec_spawn_ids", &self.exec_spawn_ids.len())
86            .field("subscribed_strategies", &self.subscribed_strategies.len())
87            .field(
88                "pending_spawn_reductions",
89                &self.pending_spawn_reductions.len(),
90            )
91            .field(
92                "strategy_event_handlers",
93                &self.strategy_event_handlers.len(),
94            )
95            .finish()
96    }
97}
98
99impl ExecutionAlgorithmCore {
100    /// Creates a new [`ExecutionAlgorithmCore`] instance.
101    ///
102    /// # Panics
103    ///
104    /// Panics if `config.exec_algorithm_id` is `None`.
105    #[must_use]
106    pub fn new(config: ExecutionAlgorithmConfig) -> Self {
107        let exec_algorithm_id = config
108            .exec_algorithm_id
109            .expect("ExecutionAlgorithmConfig must have exec_algorithm_id set");
110
111        let actor_config = DataActorConfig {
112            actor_id: Some(ActorId::from(exec_algorithm_id.inner().as_str())),
113            log_events: config.log_events,
114            log_commands: config.log_commands,
115        };
116
117        Self {
118            actor: DataActorCore::new(actor_config),
119            config,
120            exec_algorithm_id,
121            exec_spawn_ids: AHashMap::new(),
122            subscribed_strategies: AHashSet::new(),
123            pending_spawn_reductions: AHashMap::new(),
124            strategy_event_handlers: AHashMap::new(),
125        }
126    }
127
128    /// Registers the execution algorithm with the trading engine components.
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if registration with the actor core fails.
133    pub fn register(
134        &mut self,
135        trader_id: TraderId,
136        clock: Rc<RefCell<dyn Clock>>,
137        cache: Rc<RefCell<Cache>>,
138    ) -> anyhow::Result<()> {
139        self.actor.register(trader_id, clock, cache)
140    }
141
142    /// Returns the execution algorithm ID.
143    #[must_use]
144    pub fn id(&self) -> ExecAlgorithmId {
145        self.exec_algorithm_id
146    }
147
148    /// Generates the next spawn client order ID for a primary order.
149    ///
150    /// The generated ID follows the pattern: `{primary_id}-E{sequence}`.
151    #[must_use]
152    pub fn spawn_client_order_id(&mut self, primary_id: &ClientOrderId) -> ClientOrderId {
153        let sequence = self
154            .exec_spawn_ids
155            .entry(*primary_id)
156            .and_modify(|s| *s += 1)
157            .or_insert(1);
158
159        ClientOrderId::new(format!("{primary_id}-E{sequence}"))
160    }
161
162    /// Returns the current spawn sequence for a primary order, if any.
163    #[must_use]
164    pub fn spawn_sequence(&self, primary_id: &ClientOrderId) -> Option<u32> {
165        self.exec_spawn_ids.get(primary_id).copied()
166    }
167
168    /// Checks if a strategy has been subscribed to for events.
169    #[must_use]
170    pub fn is_strategy_subscribed(&self, strategy_id: &StrategyId) -> bool {
171        self.subscribed_strategies.contains(strategy_id)
172    }
173
174    /// Marks a strategy as subscribed for events.
175    pub fn add_subscribed_strategy(&mut self, strategy_id: StrategyId) {
176        self.subscribed_strategies.insert(strategy_id);
177    }
178
179    /// Stores the event handlers for a strategy subscription.
180    pub fn store_strategy_event_handlers(
181        &mut self,
182        strategy_id: StrategyId,
183        handlers: StrategyEventHandlers,
184    ) {
185        self.strategy_event_handlers.insert(strategy_id, handlers);
186    }
187
188    /// Takes and returns all stored strategy event handlers, clearing the internal map.
189    pub fn take_strategy_event_handlers(&mut self) -> AHashMap<StrategyId, StrategyEventHandlers> {
190        std::mem::take(&mut self.strategy_event_handlers)
191    }
192
193    /// Clears all spawn tracking state.
194    pub fn clear_spawn_ids(&mut self) {
195        self.exec_spawn_ids.clear();
196    }
197
198    /// Clears all strategy subscriptions.
199    pub fn clear_subscribed_strategies(&mut self) {
200        self.subscribed_strategies.clear();
201    }
202
203    /// Tracks a pending spawn reduction for potential restoration.
204    pub fn track_pending_spawn_reduction(&mut self, spawn_id: ClientOrderId, quantity: Quantity) {
205        self.pending_spawn_reductions.insert(spawn_id, quantity);
206    }
207
208    /// Removes and returns the pending spawn reduction for an order, if any.
209    pub fn take_pending_spawn_reduction(&mut self, spawn_id: &ClientOrderId) -> Option<Quantity> {
210        self.pending_spawn_reductions.remove(spawn_id)
211    }
212
213    /// Clears all pending spawn reductions.
214    pub fn clear_pending_spawn_reductions(&mut self) {
215        self.pending_spawn_reductions.clear();
216    }
217
218    /// Resets the core to its initial state.
219    ///
220    /// Note: This clears handler storage but does NOT unsubscribe from msgbus.
221    /// Call `unsubscribe_all_strategy_events` first to properly unsubscribe.
222    pub fn reset(&mut self) {
223        self.exec_spawn_ids.clear();
224        self.subscribed_strategies.clear();
225        self.pending_spawn_reductions.clear();
226        self.strategy_event_handlers.clear();
227    }
228
229    /// Returns the order for the given client order ID from the cache.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if the order is not found in the cache.
234    pub fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
235        self.cache()
236            .order(client_order_id)
237            .cloned()
238            .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
239    }
240
241    /// Returns all orders for the given order list from the cache.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if any order is not found in the cache.
246    pub fn get_orders_for_list(&self, order_list: &OrderList) -> anyhow::Result<Vec<OrderAny>> {
247        order_list
248            .client_order_ids
249            .iter()
250            .map(|id| self.get_order(id))
251            .collect()
252    }
253}
254
255impl Deref for ExecutionAlgorithmCore {
256    type Target = DataActorCore;
257    fn deref(&self) -> &Self::Target {
258        &self.actor
259    }
260}
261
262impl DerefMut for ExecutionAlgorithmCore {
263    fn deref_mut(&mut self) -> &mut Self::Target {
264        &mut self.actor
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use rstest::rstest;
271
272    use super::*;
273
274    fn create_test_config() -> ExecutionAlgorithmConfig {
275        ExecutionAlgorithmConfig {
276            exec_algorithm_id: Some(ExecAlgorithmId::new("TWAP")),
277            ..Default::default()
278        }
279    }
280
281    #[rstest]
282    fn test_core_new() {
283        let config = create_test_config();
284        let core = ExecutionAlgorithmCore::new(config.clone());
285
286        assert_eq!(core.exec_algorithm_id, ExecAlgorithmId::new("TWAP"));
287        assert_eq!(core.config.log_events, config.log_events);
288        assert!(core.exec_spawn_ids.is_empty());
289        assert!(core.subscribed_strategies.is_empty());
290    }
291
292    #[rstest]
293    fn test_spawn_client_order_id_sequence() {
294        let config = create_test_config();
295        let mut core = ExecutionAlgorithmCore::new(config);
296
297        let primary_id = ClientOrderId::new("O-001");
298
299        let spawn1 = core.spawn_client_order_id(&primary_id);
300        assert_eq!(spawn1.as_str(), "O-001-E1");
301
302        let spawn2 = core.spawn_client_order_id(&primary_id);
303        assert_eq!(spawn2.as_str(), "O-001-E2");
304
305        let spawn3 = core.spawn_client_order_id(&primary_id);
306        assert_eq!(spawn3.as_str(), "O-001-E3");
307    }
308
309    #[rstest]
310    fn test_spawn_client_order_id_different_primaries() {
311        let config = create_test_config();
312        let mut core = ExecutionAlgorithmCore::new(config);
313
314        let primary1 = ClientOrderId::new("O-001");
315        let primary2 = ClientOrderId::new("O-002");
316
317        let spawn1_1 = core.spawn_client_order_id(&primary1);
318        let spawn2_1 = core.spawn_client_order_id(&primary2);
319        let spawn1_2 = core.spawn_client_order_id(&primary1);
320
321        assert_eq!(spawn1_1.as_str(), "O-001-E1");
322        assert_eq!(spawn2_1.as_str(), "O-002-E1");
323        assert_eq!(spawn1_2.as_str(), "O-001-E2");
324    }
325
326    #[rstest]
327    fn test_spawn_sequence() {
328        let config = create_test_config();
329        let mut core = ExecutionAlgorithmCore::new(config);
330
331        let primary_id = ClientOrderId::new("O-001");
332
333        assert_eq!(core.spawn_sequence(&primary_id), None);
334
335        let _ = core.spawn_client_order_id(&primary_id);
336        assert_eq!(core.spawn_sequence(&primary_id), Some(1));
337
338        let _ = core.spawn_client_order_id(&primary_id);
339        assert_eq!(core.spawn_sequence(&primary_id), Some(2));
340    }
341
342    #[rstest]
343    fn test_strategy_subscription_tracking() {
344        let config = create_test_config();
345        let mut core = ExecutionAlgorithmCore::new(config);
346
347        let strategy_id = StrategyId::new("TEST-001");
348
349        assert!(!core.is_strategy_subscribed(&strategy_id));
350
351        core.add_subscribed_strategy(strategy_id);
352        assert!(core.is_strategy_subscribed(&strategy_id));
353    }
354
355    #[rstest]
356    fn test_clear_spawn_ids() {
357        let config = create_test_config();
358        let mut core = ExecutionAlgorithmCore::new(config);
359
360        let primary_id = ClientOrderId::new("O-001");
361        let _ = core.spawn_client_order_id(&primary_id);
362
363        assert!(core.spawn_sequence(&primary_id).is_some());
364
365        core.clear_spawn_ids();
366        assert!(core.spawn_sequence(&primary_id).is_none());
367    }
368
369    #[rstest]
370    fn test_reset() {
371        let config = create_test_config();
372        let mut core = ExecutionAlgorithmCore::new(config);
373
374        let primary_id = ClientOrderId::new("O-001");
375        let strategy_id = StrategyId::new("TEST-001");
376
377        let _ = core.spawn_client_order_id(&primary_id);
378        core.add_subscribed_strategy(strategy_id);
379
380        core.reset();
381
382        assert!(core.spawn_sequence(&primary_id).is_none());
383        assert!(!core.is_strategy_subscribed(&strategy_id));
384    }
385
386    #[rstest]
387    fn test_deref_to_data_actor_core() {
388        let config = create_test_config();
389        let core = ExecutionAlgorithmCore::new(config);
390
391        // Should be able to access DataActorCore methods via Deref
392        assert!(core.trader_id().is_none());
393    }
394}