nautilus_trading/algorithm/
core.rs1use std::{
19 cell::RefCell,
20 fmt::Debug,
21 ops::{Deref, DerefMut},
22 rc::Rc,
23};
24
25use ahash::{AHashMap, AHashSet};
26use nautilus_common::{
27 actor::{DataActorConfig, DataActorCore},
28 cache::Cache,
29 clock::Clock,
30 msgbus::TypedHandler,
31};
32use nautilus_model::{
33 events::{OrderEventAny, PositionEvent},
34 identifiers::{ActorId, ClientOrderId, ExecAlgorithmId, StrategyId, TraderId},
35 orders::{OrderAny, OrderList},
36 types::Quantity,
37};
38
39use super::config::ExecutionAlgorithmConfig;
40
41#[derive(Clone, Debug)]
43pub struct StrategyEventHandlers {
44 pub order_topic: String,
46 pub order_handler: TypedHandler<OrderEventAny>,
48 pub position_topic: String,
50 pub position_handler: TypedHandler<PositionEvent>,
52}
53
54pub struct ExecutionAlgorithmCore {
63 pub actor: DataActorCore,
65 pub config: ExecutionAlgorithmConfig,
67 pub exec_algorithm_id: ExecAlgorithmId,
69 exec_spawn_ids: AHashMap<ClientOrderId, u32>,
71 subscribed_strategies: AHashSet<StrategyId>,
73 pending_spawn_reductions: AHashMap<ClientOrderId, Quantity>,
75 strategy_event_handlers: AHashMap<StrategyId, StrategyEventHandlers>,
77}
78
79impl Debug for ExecutionAlgorithmCore {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct(stringify!(ExecutionAlgorithmCore))
82 .field("actor", &self.actor)
83 .field("config", &self.config)
84 .field("exec_algorithm_id", &self.exec_algorithm_id)
85 .field("exec_spawn_ids", &self.exec_spawn_ids.len())
86 .field("subscribed_strategies", &self.subscribed_strategies.len())
87 .field(
88 "pending_spawn_reductions",
89 &self.pending_spawn_reductions.len(),
90 )
91 .field(
92 "strategy_event_handlers",
93 &self.strategy_event_handlers.len(),
94 )
95 .finish()
96 }
97}
98
99impl ExecutionAlgorithmCore {
100 #[must_use]
106 pub fn new(config: ExecutionAlgorithmConfig) -> Self {
107 let exec_algorithm_id = config
108 .exec_algorithm_id
109 .expect("ExecutionAlgorithmConfig must have exec_algorithm_id set");
110
111 let actor_config = DataActorConfig {
112 actor_id: Some(ActorId::from(exec_algorithm_id.inner().as_str())),
113 log_events: config.log_events,
114 log_commands: config.log_commands,
115 };
116
117 Self {
118 actor: DataActorCore::new(actor_config),
119 config,
120 exec_algorithm_id,
121 exec_spawn_ids: AHashMap::new(),
122 subscribed_strategies: AHashSet::new(),
123 pending_spawn_reductions: AHashMap::new(),
124 strategy_event_handlers: AHashMap::new(),
125 }
126 }
127
128 pub fn register(
134 &mut self,
135 trader_id: TraderId,
136 clock: Rc<RefCell<dyn Clock>>,
137 cache: Rc<RefCell<Cache>>,
138 ) -> anyhow::Result<()> {
139 self.actor.register(trader_id, clock, cache)
140 }
141
142 #[must_use]
144 pub fn id(&self) -> ExecAlgorithmId {
145 self.exec_algorithm_id
146 }
147
148 #[must_use]
152 pub fn spawn_client_order_id(&mut self, primary_id: &ClientOrderId) -> ClientOrderId {
153 let sequence = self
154 .exec_spawn_ids
155 .entry(*primary_id)
156 .and_modify(|s| *s += 1)
157 .or_insert(1);
158
159 ClientOrderId::new(format!("{primary_id}-E{sequence}"))
160 }
161
162 #[must_use]
164 pub fn spawn_sequence(&self, primary_id: &ClientOrderId) -> Option<u32> {
165 self.exec_spawn_ids.get(primary_id).copied()
166 }
167
168 #[must_use]
170 pub fn is_strategy_subscribed(&self, strategy_id: &StrategyId) -> bool {
171 self.subscribed_strategies.contains(strategy_id)
172 }
173
174 pub fn add_subscribed_strategy(&mut self, strategy_id: StrategyId) {
176 self.subscribed_strategies.insert(strategy_id);
177 }
178
179 pub fn store_strategy_event_handlers(
181 &mut self,
182 strategy_id: StrategyId,
183 handlers: StrategyEventHandlers,
184 ) {
185 self.strategy_event_handlers.insert(strategy_id, handlers);
186 }
187
188 pub fn take_strategy_event_handlers(&mut self) -> AHashMap<StrategyId, StrategyEventHandlers> {
190 std::mem::take(&mut self.strategy_event_handlers)
191 }
192
193 pub fn clear_spawn_ids(&mut self) {
195 self.exec_spawn_ids.clear();
196 }
197
198 pub fn clear_subscribed_strategies(&mut self) {
200 self.subscribed_strategies.clear();
201 }
202
203 pub fn track_pending_spawn_reduction(&mut self, spawn_id: ClientOrderId, quantity: Quantity) {
205 self.pending_spawn_reductions.insert(spawn_id, quantity);
206 }
207
208 pub fn take_pending_spawn_reduction(&mut self, spawn_id: &ClientOrderId) -> Option<Quantity> {
210 self.pending_spawn_reductions.remove(spawn_id)
211 }
212
213 pub fn clear_pending_spawn_reductions(&mut self) {
215 self.pending_spawn_reductions.clear();
216 }
217
218 pub fn reset(&mut self) {
223 self.exec_spawn_ids.clear();
224 self.subscribed_strategies.clear();
225 self.pending_spawn_reductions.clear();
226 self.strategy_event_handlers.clear();
227 }
228
229 pub fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
235 self.cache()
236 .order(client_order_id)
237 .cloned()
238 .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
239 }
240
241 pub fn get_orders_for_list(&self, order_list: &OrderList) -> anyhow::Result<Vec<OrderAny>> {
247 order_list
248 .client_order_ids
249 .iter()
250 .map(|id| self.get_order(id))
251 .collect()
252 }
253}
254
255impl Deref for ExecutionAlgorithmCore {
256 type Target = DataActorCore;
257 fn deref(&self) -> &Self::Target {
258 &self.actor
259 }
260}
261
262impl DerefMut for ExecutionAlgorithmCore {
263 fn deref_mut(&mut self) -> &mut Self::Target {
264 &mut self.actor
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use rstest::rstest;
271
272 use super::*;
273
274 fn create_test_config() -> ExecutionAlgorithmConfig {
275 ExecutionAlgorithmConfig {
276 exec_algorithm_id: Some(ExecAlgorithmId::new("TWAP")),
277 ..Default::default()
278 }
279 }
280
281 #[rstest]
282 fn test_core_new() {
283 let config = create_test_config();
284 let core = ExecutionAlgorithmCore::new(config.clone());
285
286 assert_eq!(core.exec_algorithm_id, ExecAlgorithmId::new("TWAP"));
287 assert_eq!(core.config.log_events, config.log_events);
288 assert!(core.exec_spawn_ids.is_empty());
289 assert!(core.subscribed_strategies.is_empty());
290 }
291
292 #[rstest]
293 fn test_spawn_client_order_id_sequence() {
294 let config = create_test_config();
295 let mut core = ExecutionAlgorithmCore::new(config);
296
297 let primary_id = ClientOrderId::new("O-001");
298
299 let spawn1 = core.spawn_client_order_id(&primary_id);
300 assert_eq!(spawn1.as_str(), "O-001-E1");
301
302 let spawn2 = core.spawn_client_order_id(&primary_id);
303 assert_eq!(spawn2.as_str(), "O-001-E2");
304
305 let spawn3 = core.spawn_client_order_id(&primary_id);
306 assert_eq!(spawn3.as_str(), "O-001-E3");
307 }
308
309 #[rstest]
310 fn test_spawn_client_order_id_different_primaries() {
311 let config = create_test_config();
312 let mut core = ExecutionAlgorithmCore::new(config);
313
314 let primary1 = ClientOrderId::new("O-001");
315 let primary2 = ClientOrderId::new("O-002");
316
317 let spawn1_1 = core.spawn_client_order_id(&primary1);
318 let spawn2_1 = core.spawn_client_order_id(&primary2);
319 let spawn1_2 = core.spawn_client_order_id(&primary1);
320
321 assert_eq!(spawn1_1.as_str(), "O-001-E1");
322 assert_eq!(spawn2_1.as_str(), "O-002-E1");
323 assert_eq!(spawn1_2.as_str(), "O-001-E2");
324 }
325
326 #[rstest]
327 fn test_spawn_sequence() {
328 let config = create_test_config();
329 let mut core = ExecutionAlgorithmCore::new(config);
330
331 let primary_id = ClientOrderId::new("O-001");
332
333 assert_eq!(core.spawn_sequence(&primary_id), None);
334
335 let _ = core.spawn_client_order_id(&primary_id);
336 assert_eq!(core.spawn_sequence(&primary_id), Some(1));
337
338 let _ = core.spawn_client_order_id(&primary_id);
339 assert_eq!(core.spawn_sequence(&primary_id), Some(2));
340 }
341
342 #[rstest]
343 fn test_strategy_subscription_tracking() {
344 let config = create_test_config();
345 let mut core = ExecutionAlgorithmCore::new(config);
346
347 let strategy_id = StrategyId::new("TEST-001");
348
349 assert!(!core.is_strategy_subscribed(&strategy_id));
350
351 core.add_subscribed_strategy(strategy_id);
352 assert!(core.is_strategy_subscribed(&strategy_id));
353 }
354
355 #[rstest]
356 fn test_clear_spawn_ids() {
357 let config = create_test_config();
358 let mut core = ExecutionAlgorithmCore::new(config);
359
360 let primary_id = ClientOrderId::new("O-001");
361 let _ = core.spawn_client_order_id(&primary_id);
362
363 assert!(core.spawn_sequence(&primary_id).is_some());
364
365 core.clear_spawn_ids();
366 assert!(core.spawn_sequence(&primary_id).is_none());
367 }
368
369 #[rstest]
370 fn test_reset() {
371 let config = create_test_config();
372 let mut core = ExecutionAlgorithmCore::new(config);
373
374 let primary_id = ClientOrderId::new("O-001");
375 let strategy_id = StrategyId::new("TEST-001");
376
377 let _ = core.spawn_client_order_id(&primary_id);
378 core.add_subscribed_strategy(strategy_id);
379
380 core.reset();
381
382 assert!(core.spawn_sequence(&primary_id).is_none());
383 assert!(!core.is_strategy_subscribed(&strategy_id));
384 }
385
386 #[rstest]
387 fn test_deref_to_data_actor_core() {
388 let config = create_test_config();
389 let core = ExecutionAlgorithmCore::new(config);
390
391 assert!(core.trader_id().is_none());
393 }
394}