nautilus_execution/order_manager/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, fmt::Debug, rc::Rc};
21
22use ahash::AHashMap;
23use nautilus_common::{
24    cache::Cache,
25    clock::Clock,
26    logging::{CMD, EVT, SEND},
27    messages::execution::{SubmitOrder, TradingCommand},
28    msgbus,
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32    enums::{ContingencyType, TriggerType},
33    events::{
34        OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
35    },
36    identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
37    orders::{Order, OrderAny},
38    types::Quantity,
39};
40
41/// Manages the lifecycle and state of orders with contingency handling.
42///
43/// The order manager is responsible for managing local order state, handling
44/// contingent orders (OTO, OCO, OUO), and coordinating with emulation and
45/// execution systems. It tracks order commands and manages complex order
46/// relationships for advanced order types.
47pub struct OrderManager {
48    clock: Rc<RefCell<dyn Clock>>,
49    cache: Rc<RefCell<Cache>>,
50    active_local: bool,
51    // submit_order_handler: Option<SubmitOrderHandlerAny>,
52    // cancel_order_handler: Option<CancelOrderHandlerAny>,
53    // modify_order_handler: Option<ModifyOrderHandlerAny>,
54    submit_order_commands: AHashMap<ClientOrderId, SubmitOrder>,
55}
56
57impl Debug for OrderManager {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct(stringify!(OrderManager))
60            .field("pending_commands", &self.submit_order_commands.len())
61            .finish()
62    }
63}
64
65impl OrderManager {
66    /// Creates a new [`OrderManager`] instance.
67    pub fn new(
68        clock: Rc<RefCell<dyn Clock>>,
69        cache: Rc<RefCell<Cache>>,
70        active_local: bool,
71        // submit_order_handler: Option<SubmitOrderHandlerAny>,
72        // cancel_order_handler: Option<CancelOrderHandlerAny>,
73        // modify_order_handler: Option<ModifyOrderHandlerAny>,
74    ) -> Self {
75        Self {
76            clock,
77            cache,
78            active_local,
79            // submit_order_handler,
80            // cancel_order_handler,
81            // modify_order_handler,
82            submit_order_commands: AHashMap::new(),
83        }
84    }
85
86    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
87    //     self.submit_order_handler = Some(handler);
88    // }
89    //
90    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
91    //     self.cancel_order_handler = Some(handler);
92    // }
93    //
94    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
95    //     self.modify_order_handler = Some(handler);
96    // }
97
98    #[must_use]
99    /// Returns a copy of all cached submit order commands.
100    pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
101        self.submit_order_commands.clone()
102    }
103
104    /// Caches a submit order command for later processing.
105    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
106        self.submit_order_commands
107            .insert(command.order.client_order_id(), command);
108    }
109
110    /// Removes and returns a cached submit order command.
111    pub fn pop_submit_order_command(
112        &mut self,
113        client_order_id: ClientOrderId,
114    ) -> Option<SubmitOrder> {
115        self.submit_order_commands.remove(&client_order_id)
116    }
117
118    /// Resets the order manager by clearing all cached commands.
119    pub fn reset(&mut self) {
120        self.submit_order_commands.clear();
121    }
122
123    /// Cancels an order if it's not already pending cancellation or closed.
124    pub fn cancel_order(&mut self, order: &OrderAny) {
125        if self
126            .cache
127            .borrow()
128            .is_order_pending_cancel_local(&order.client_order_id())
129        {
130            return;
131        }
132
133        if order.is_closed() {
134            log::warn!("Cannot cancel order: already closed");
135            return;
136        }
137
138        self.submit_order_commands.remove(&order.client_order_id());
139
140        // if let Some(handler) = &self.cancel_order_handler {
141        //     handler.handle_cancel_order(order);
142        // }
143    }
144
145    /// Modifies the quantity of an existing order.
146    pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
147        // if let Some(handler) = &self.modify_order_handler {
148        //     handler.handle_modify_order(order, new_quantity);
149        // }
150    }
151
152    /// # Errors
153    ///
154    /// Returns an error if creating a new submit order fails.
155    pub fn create_new_submit_order(
156        &mut self,
157        order: &OrderAny,
158        position_id: Option<PositionId>,
159        client_id: Option<ClientId>,
160    ) -> anyhow::Result<()> {
161        let submit = SubmitOrder::new(
162            order.trader_id(),
163            client_id,
164            order.strategy_id(),
165            order.instrument_id(),
166            order.clone(),
167            order.exec_algorithm_id(),
168            position_id,
169            None, // params
170            UUID4::new(),
171            self.clock.borrow().timestamp_ns(),
172        );
173
174        if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
175            self.cache_submit_order_command(submit.clone());
176
177            match order.exec_algorithm_id() {
178                Some(exec_algorithm_id) => {
179                    self.send_algo_command(submit, exec_algorithm_id);
180                }
181                None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
182            }
183        } // else if let Some(handler) = &self.submit_order_handler {
184        //     handler.handle_submit_order(submit);
185        // }
186
187        Ok(())
188    }
189
190    #[must_use]
191    /// Returns true if the order manager should manage the given order.
192    pub fn should_manage_order(&self, order: &OrderAny) -> bool {
193        self.active_local && order.is_active_local()
194    }
195
196    // Event Handlers
197    /// Handles an order event by routing it to the appropriate handler method.
198    ///
199    /// Note: Only handles specific terminal/actionable events. Other events
200    /// like `OrderSubmitted`, `OrderAccepted`, etc. are no-ops for the order manager.
201    pub fn handle_event(&mut self, event: OrderEventAny) {
202        match event {
203            OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
204            OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
205            OrderEventAny::Expired(event) => self.handle_order_expired(event),
206            OrderEventAny::Updated(event) => self.handle_order_updated(event),
207            OrderEventAny::Filled(event) => self.handle_order_filled(event),
208            _ => {}
209        }
210    }
211
212    /// Handles an order rejected event and manages any contingent orders.
213    pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
214        let cloned_order = self
215            .cache
216            .borrow()
217            .order(&rejected.client_order_id)
218            .cloned();
219        if let Some(order) = cloned_order {
220            if order.contingency_type() != Some(ContingencyType::NoContingency) {
221                self.handle_contingencies(order);
222            }
223        } else {
224            log::error!(
225                "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
226                rejected.client_order_id,
227                rejected
228            );
229        }
230    }
231
232    pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
233        let cloned_order = self
234            .cache
235            .borrow()
236            .order(&canceled.client_order_id)
237            .cloned();
238        if let Some(order) = cloned_order {
239            if order.contingency_type() != Some(ContingencyType::NoContingency) {
240                self.handle_contingencies(order);
241            }
242        } else {
243            log::error!(
244                "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
245                canceled.client_order_id,
246                canceled
247            );
248        }
249    }
250
251    pub fn handle_order_expired(&mut self, expired: OrderExpired) {
252        let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
253        if let Some(order) = cloned_order {
254            if order.contingency_type() != Some(ContingencyType::NoContingency) {
255                self.handle_contingencies(order);
256            }
257        } else {
258            log::error!(
259                "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
260                expired.client_order_id,
261                expired
262            );
263        }
264    }
265
266    pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
267        let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
268        if let Some(order) = cloned_order {
269            if order.contingency_type() != Some(ContingencyType::NoContingency) {
270                self.handle_contingencies_update(order);
271            }
272        } else {
273            log::error!(
274                "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
275                updated.client_order_id,
276                updated
277            );
278        }
279    }
280
281    /// # Panics
282    ///
283    /// Panics if the OTO child order cannot be found for the given client order ID.
284    pub fn handle_order_filled(&mut self, filled: OrderFilled) {
285        let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
286        {
287            order
288        } else {
289            log::error!(
290                "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
291                filled.client_order_id,
292                filled
293            );
294            return;
295        };
296
297        match order.contingency_type() {
298            Some(ContingencyType::Oto) => {
299                let position_id = self
300                    .cache
301                    .borrow()
302                    .position_id(&order.client_order_id())
303                    .copied();
304                let client_id = self
305                    .cache
306                    .borrow()
307                    .client_id(&order.client_order_id())
308                    .copied();
309
310                let parent_filled_qty = match order.exec_spawn_id() {
311                    Some(spawn_id) => {
312                        if let Some(qty) = self
313                            .cache
314                            .borrow()
315                            .exec_spawn_total_filled_qty(&spawn_id, true)
316                        {
317                            qty
318                        } else {
319                            log::error!("Failed to get spawn filled quantity for {spawn_id}");
320                            return;
321                        }
322                    }
323                    None => order.filled_qty(),
324                };
325
326                let linked_orders = if let Some(orders) = order.linked_order_ids() {
327                    orders
328                } else {
329                    log::error!("No linked orders found for OTO order");
330                    return;
331                };
332
333                for client_order_id in linked_orders {
334                    let mut child_order =
335                        if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
336                            order
337                        } else {
338                            panic!(
339                                "Cannot find OTO child order for client_order_id: {client_order_id}"
340                            );
341                        };
342
343                    if !self.should_manage_order(&child_order) {
344                        continue;
345                    }
346
347                    if child_order.position_id().is_none() {
348                        child_order.set_position_id(position_id);
349                    }
350
351                    if parent_filled_qty != child_order.leaves_qty() {
352                        self.modify_order_quantity(&mut child_order, parent_filled_qty);
353                    }
354
355                    // if self.submit_order_handler.is_none() {
356                    //     return;
357                    // }
358
359                    if !self
360                        .submit_order_commands
361                        .contains_key(&child_order.client_order_id())
362                        && let Err(e) =
363                            self.create_new_submit_order(&child_order, position_id, client_id)
364                    {
365                        log::error!("Failed to create new submit order: {e}");
366                    }
367                }
368            }
369            Some(ContingencyType::Oco) => {
370                let linked_orders = if let Some(orders) = order.linked_order_ids() {
371                    orders
372                } else {
373                    log::error!("No linked orders found for OCO order");
374                    return;
375                };
376
377                for client_order_id in linked_orders {
378                    let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
379                    {
380                        Some(contingent_order) => contingent_order,
381                        None => {
382                            panic!(
383                                "Cannot find OCO contingent order for client_order_id: {client_order_id}"
384                            );
385                        }
386                    };
387
388                    // Not being managed || Already completed
389                    if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
390                    {
391                        continue;
392                    }
393                    if contingent_order.client_order_id() != order.client_order_id() {
394                        self.cancel_order(&contingent_order);
395                    }
396                }
397            }
398            Some(ContingencyType::Ouo) => self.handle_contingencies(order),
399            _ => {}
400        }
401    }
402
403    /// # Panics
404    ///
405    /// Panics if a contingent order cannot be found for the given client order ID.
406    pub fn handle_contingencies(&mut self, order: OrderAny) {
407        let (filled_qty, leaves_qty, is_spawn_active) =
408            if let Some(exec_spawn_id) = order.exec_spawn_id() {
409                if let (Some(filled), Some(leaves)) = (
410                    self.cache
411                        .borrow()
412                        .exec_spawn_total_filled_qty(&exec_spawn_id, true),
413                    self.cache
414                        .borrow()
415                        .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
416                ) {
417                    (filled, leaves, leaves.raw > 0)
418                } else {
419                    log::error!("Failed to get spawn quantities for {exec_spawn_id}");
420                    return;
421                }
422            } else {
423                (order.filled_qty(), order.leaves_qty(), false)
424            };
425
426        let linked_orders = if let Some(orders) = order.linked_order_ids() {
427            orders
428        } else {
429            log::error!("No linked orders found");
430            return;
431        };
432
433        for client_order_id in linked_orders {
434            let mut contingent_order =
435                if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
436                    order
437                } else {
438                    panic!("Cannot find contingent order for client_order_id: {client_order_id}");
439                };
440
441            if !self.should_manage_order(&contingent_order)
442                || client_order_id == &order.client_order_id()
443            {
444                continue;
445            }
446
447            if contingent_order.is_closed() {
448                self.submit_order_commands.remove(&order.client_order_id());
449                continue;
450            }
451
452            match order.contingency_type() {
453                Some(ContingencyType::Oto) => {
454                    if order.is_closed()
455                        && filled_qty.raw == 0
456                        && (order.exec_spawn_id().is_none() || !is_spawn_active)
457                    {
458                        self.cancel_order(&contingent_order);
459                    } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
460                        self.modify_order_quantity(&mut contingent_order, filled_qty);
461                    }
462                }
463                Some(ContingencyType::Oco) => {
464                    if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
465                        self.cancel_order(&contingent_order);
466                    }
467                }
468                Some(ContingencyType::Ouo) => {
469                    if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
470                        || (order.is_closed()
471                            && (order.exec_spawn_id().is_none() || !is_spawn_active))
472                    {
473                        self.cancel_order(&contingent_order);
474                    } else if leaves_qty != contingent_order.leaves_qty() {
475                        self.modify_order_quantity(&mut contingent_order, leaves_qty);
476                    }
477                }
478                _ => {}
479            }
480        }
481    }
482
483    /// # Panics
484    ///
485    /// Panics if an OCO contingent order cannot be found for the given client order ID.
486    pub fn handle_contingencies_update(&mut self, order: OrderAny) {
487        let quantity = match order.exec_spawn_id() {
488            Some(exec_spawn_id) => {
489                if let Some(qty) = self
490                    .cache
491                    .borrow()
492                    .exec_spawn_total_quantity(&exec_spawn_id, true)
493                {
494                    qty
495                } else {
496                    log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
497                    return;
498                }
499            }
500            None => order.quantity(),
501        };
502
503        if quantity.raw == 0 {
504            return;
505        }
506
507        let linked_orders = if let Some(orders) = order.linked_order_ids() {
508            orders
509        } else {
510            log::error!("No linked orders found for contingent order");
511            return;
512        };
513
514        for client_order_id in linked_orders {
515            let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
516                Some(contingent_order) => contingent_order,
517                None => panic!(
518                    "Cannot find OCO contingent order for client_order_id: {client_order_id}"
519                ),
520            };
521
522            if !self.should_manage_order(&contingent_order)
523                || client_order_id == &order.client_order_id()
524                || contingent_order.is_closed()
525            {
526                continue;
527            }
528
529            if let Some(contingency_type) = order.contingency_type()
530                && matches!(
531                    contingency_type,
532                    ContingencyType::Oto | ContingencyType::Oco
533                )
534                && quantity != contingent_order.quantity()
535            {
536                self.modify_order_quantity(&mut contingent_order, quantity);
537            }
538        }
539    }
540
541    // Message sending methods
542    pub fn send_emulator_command(&self, command: TradingCommand) {
543        log_cmd_send(&command);
544        msgbus::send_any("OrderEmulator.execute".into(), &command);
545    }
546
547    pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
548        let id = command.strategy_id;
549        log::info!("{id} {CMD}{SEND} {command}");
550
551        let endpoint = format!("{exec_algorithm_id}.execute");
552        msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
553    }
554
555    pub fn send_risk_command(&self, command: TradingCommand) {
556        log_cmd_send(&command);
557        msgbus::send_any("RiskEngine.execute".into(), &command);
558    }
559
560    pub fn send_exec_command(&self, command: TradingCommand) {
561        log_cmd_send(&command);
562        msgbus::send_any("ExecEngine.execute".into(), &command);
563    }
564
565    pub fn send_risk_event(&self, event: OrderEventAny) {
566        log_evt_send(&event);
567        msgbus::send_any("RiskEngine.process".into(), &event);
568    }
569
570    pub fn send_exec_event(&self, event: OrderEventAny) {
571        log_evt_send(&event);
572        msgbus::send_any("ExecEngine.process".into(), &event);
573    }
574}
575
576#[inline(always)]
577fn log_cmd_send(command: &TradingCommand) {
578    if let Some(id) = command.strategy_id() {
579        log::info!("{id} {CMD}{SEND} {command}");
580    } else {
581        log::info!("{CMD}{SEND} {command}");
582    }
583}
584
585#[inline(always)]
586fn log_evt_send(event: &OrderEventAny) {
587    let id = event.strategy_id();
588    log::info!("{id} {EVT}{SEND} {event}");
589}
590
591#[cfg(test)]
592mod tests {
593    use nautilus_core::UUID4;
594    use nautilus_model::{
595        events::{OrderAccepted, OrderSubmitted},
596        identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
597    };
598    use rstest::rstest;
599
600    use super::*;
601
602    /// Verifies unhandled order events are no-ops and don't panic.
603    /// Previously, unhandled events would hit a todo!() panic.
604    #[rstest]
605    fn test_handle_event_unhandled_events_are_noop() {
606        let submitted = OrderEventAny::Submitted(OrderSubmitted {
607            trader_id: TraderId::from("TRADER-001"),
608            strategy_id: StrategyId::from("STRATEGY-001"),
609            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
610            client_order_id: ClientOrderId::from("O-001"),
611            account_id: AccountId::from("ACCOUNT-001"),
612            event_id: UUID4::new(),
613            ts_event: Default::default(),
614            ts_init: Default::default(),
615        });
616        let accepted = OrderEventAny::Accepted(OrderAccepted {
617            trader_id: TraderId::from("TRADER-001"),
618            strategy_id: StrategyId::from("STRATEGY-001"),
619            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
620            client_order_id: ClientOrderId::from("O-001"),
621            venue_order_id: VenueOrderId::from("V-001"),
622            account_id: AccountId::from("ACCOUNT-001"),
623            event_id: UUID4::new(),
624            ts_event: Default::default(),
625            ts_init: Default::default(),
626            reconciliation: 0,
627        });
628
629        match submitted {
630            OrderEventAny::Rejected(_) => panic!("Should not match"),
631            OrderEventAny::Canceled(_) => panic!("Should not match"),
632            OrderEventAny::Expired(_) => panic!("Should not match"),
633            OrderEventAny::Updated(_) => panic!("Should not match"),
634            OrderEventAny::Filled(_) => panic!("Should not match"),
635            _ => {}
636        }
637        match accepted {
638            OrderEventAny::Rejected(_) => panic!("Should not match"),
639            OrderEventAny::Canceled(_) => panic!("Should not match"),
640            OrderEventAny::Expired(_) => panic!("Should not match"),
641            OrderEventAny::Updated(_) => panic!("Should not match"),
642            OrderEventAny::Filled(_) => panic!("Should not match"),
643            _ => {}
644        }
645    }
646}