nautilus_execution/order_manager/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
21
22use nautilus_common::{
23    cache::Cache,
24    clock::Clock,
25    logging::{CMD, EVT, SEND},
26    messages::execution::{SubmitOrder, TradingCommand},
27    msgbus,
28};
29use nautilus_core::UUID4;
30use nautilus_model::{
31    enums::{ContingencyType, TriggerType},
32    events::{
33        OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
34    },
35    identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
36    orders::{Order, OrderAny},
37    types::Quantity,
38};
39
40/// Manages the lifecycle and state of orders with contingency handling.
41///
42/// The order manager is responsible for managing local order state, handling
43/// contingent orders (OTO, OCO, OUO), and coordinating with emulation and
44/// execution systems. It tracks order commands and manages complex order
45/// relationships for advanced order types.
46pub struct OrderManager {
47    clock: Rc<RefCell<dyn Clock>>,
48    cache: Rc<RefCell<Cache>>,
49    active_local: bool,
50    // submit_order_handler: Option<SubmitOrderHandlerAny>,
51    // cancel_order_handler: Option<CancelOrderHandlerAny>,
52    // modify_order_handler: Option<ModifyOrderHandlerAny>,
53    submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
54}
55
56impl Debug for OrderManager {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct(stringify!(OrderManager))
59            .field("pending_commands", &self.submit_order_commands.len())
60            .finish()
61    }
62}
63
64impl OrderManager {
65    /// Creates a new [`OrderManager`] instance.
66    pub fn new(
67        clock: Rc<RefCell<dyn Clock>>,
68        cache: Rc<RefCell<Cache>>,
69        active_local: bool,
70        // submit_order_handler: Option<SubmitOrderHandlerAny>,
71        // cancel_order_handler: Option<CancelOrderHandlerAny>,
72        // modify_order_handler: Option<ModifyOrderHandlerAny>,
73    ) -> Self {
74        Self {
75            clock,
76            cache,
77            active_local,
78            // submit_order_handler,
79            // cancel_order_handler,
80            // modify_order_handler,
81            submit_order_commands: HashMap::new(),
82        }
83    }
84
85    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
86    //     self.submit_order_handler = Some(handler);
87    // }
88    //
89    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
90    //     self.cancel_order_handler = Some(handler);
91    // }
92    //
93    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
94    //     self.modify_order_handler = Some(handler);
95    // }
96
97    #[must_use]
98    /// Returns a copy of all cached submit order commands.
99    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
100        self.submit_order_commands.clone()
101    }
102
103    /// Caches a submit order command for later processing.
104    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
105        self.submit_order_commands
106            .insert(command.order.client_order_id(), command);
107    }
108
109    /// Removes and returns a cached submit order command.
110    pub fn pop_submit_order_command(
111        &mut self,
112        client_order_id: ClientOrderId,
113    ) -> Option<SubmitOrder> {
114        self.submit_order_commands.remove(&client_order_id)
115    }
116
117    /// Resets the order manager by clearing all cached commands.
118    pub fn reset(&mut self) {
119        self.submit_order_commands.clear();
120    }
121
122    /// Cancels an order if it's not already pending cancellation or closed.
123    pub fn cancel_order(&mut self, order: &OrderAny) {
124        if self
125            .cache
126            .borrow()
127            .is_order_pending_cancel_local(&order.client_order_id())
128        {
129            return;
130        }
131
132        if order.is_closed() {
133            log::warn!("Cannot cancel order: already closed");
134            return;
135        }
136
137        self.submit_order_commands.remove(&order.client_order_id());
138
139        // if let Some(handler) = &self.cancel_order_handler {
140        //     handler.handle_cancel_order(order);
141        // }
142    }
143
144    /// Modifies the quantity of an existing order.
145    pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
146        // if let Some(handler) = &self.modify_order_handler {
147        //     handler.handle_modify_order(order, new_quantity);
148        // }
149    }
150
151    /// # Errors
152    ///
153    /// Returns an error if creating a new submit order fails.
154    pub fn create_new_submit_order(
155        &mut self,
156        order: &OrderAny,
157        position_id: Option<PositionId>,
158        client_id: Option<ClientId>,
159    ) -> anyhow::Result<()> {
160        let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
161        let venue_order_id = order
162            .venue_order_id()
163            .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
164
165        let submit = SubmitOrder::new(
166            order.trader_id(),
167            client_id,
168            order.strategy_id(),
169            order.instrument_id(),
170            order.client_order_id(),
171            venue_order_id,
172            order.clone(),
173            order.exec_algorithm_id(),
174            position_id,
175            None, // params
176            UUID4::new(),
177            self.clock.borrow().timestamp_ns(),
178        )?;
179
180        if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
181            self.cache_submit_order_command(submit.clone());
182
183            match order.exec_algorithm_id() {
184                Some(exec_algorithm_id) => {
185                    self.send_algo_command(submit, exec_algorithm_id);
186                }
187                None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
188            }
189        } // else if let Some(handler) = &self.submit_order_handler {
190        //     handler.handle_submit_order(submit);
191        // }
192
193        Ok(())
194    }
195
196    #[must_use]
197    /// Returns true if the order manager should manage the given order.
198    pub fn should_manage_order(&self, order: &OrderAny) -> bool {
199        self.active_local && order.is_active_local()
200    }
201
202    // Event Handlers
203    /// Handles an order event by routing it to the appropriate handler method.
204    ///
205    /// Note: Only handles specific terminal/actionable events. Other events
206    /// like `OrderSubmitted`, `OrderAccepted`, etc. are no-ops for the order manager.
207    pub fn handle_event(&mut self, event: OrderEventAny) {
208        match event {
209            OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
210            OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
211            OrderEventAny::Expired(event) => self.handle_order_expired(event),
212            OrderEventAny::Updated(event) => self.handle_order_updated(event),
213            OrderEventAny::Filled(event) => self.handle_order_filled(event),
214            _ => {}
215        }
216    }
217
218    /// Handles an order rejected event and manages any contingent orders.
219    pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
220        let cloned_order = self
221            .cache
222            .borrow()
223            .order(&rejected.client_order_id)
224            .cloned();
225        if let Some(order) = cloned_order {
226            if order.contingency_type() != Some(ContingencyType::NoContingency) {
227                self.handle_contingencies(order);
228            }
229        } else {
230            log::error!(
231                "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
232                rejected.client_order_id,
233                rejected
234            );
235        }
236    }
237
238    pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
239        let cloned_order = self
240            .cache
241            .borrow()
242            .order(&canceled.client_order_id)
243            .cloned();
244        if let Some(order) = cloned_order {
245            if order.contingency_type() != Some(ContingencyType::NoContingency) {
246                self.handle_contingencies(order);
247            }
248        } else {
249            log::error!(
250                "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
251                canceled.client_order_id,
252                canceled
253            );
254        }
255    }
256
257    pub fn handle_order_expired(&mut self, expired: OrderExpired) {
258        let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
259        if let Some(order) = cloned_order {
260            if order.contingency_type() != Some(ContingencyType::NoContingency) {
261                self.handle_contingencies(order);
262            }
263        } else {
264            log::error!(
265                "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
266                expired.client_order_id,
267                expired
268            );
269        }
270    }
271
272    pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
273        let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
274        if let Some(order) = cloned_order {
275            if order.contingency_type() != Some(ContingencyType::NoContingency) {
276                self.handle_contingencies_update(order);
277            }
278        } else {
279            log::error!(
280                "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
281                updated.client_order_id,
282                updated
283            );
284        }
285    }
286
287    /// # Panics
288    ///
289    /// Panics if the OTO child order cannot be found for the given client order ID.
290    pub fn handle_order_filled(&mut self, filled: OrderFilled) {
291        let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
292        {
293            order
294        } else {
295            log::error!(
296                "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
297                filled.client_order_id,
298                filled
299            );
300            return;
301        };
302
303        match order.contingency_type() {
304            Some(ContingencyType::Oto) => {
305                let position_id = self
306                    .cache
307                    .borrow()
308                    .position_id(&order.client_order_id())
309                    .copied();
310                let client_id = self
311                    .cache
312                    .borrow()
313                    .client_id(&order.client_order_id())
314                    .copied();
315
316                let parent_filled_qty = match order.exec_spawn_id() {
317                    Some(spawn_id) => {
318                        if let Some(qty) = self
319                            .cache
320                            .borrow()
321                            .exec_spawn_total_filled_qty(&spawn_id, true)
322                        {
323                            qty
324                        } else {
325                            log::error!("Failed to get spawn filled quantity for {spawn_id}");
326                            return;
327                        }
328                    }
329                    None => order.filled_qty(),
330                };
331
332                let linked_orders = if let Some(orders) = order.linked_order_ids() {
333                    orders
334                } else {
335                    log::error!("No linked orders found for OTO order");
336                    return;
337                };
338
339                for client_order_id in linked_orders {
340                    let mut child_order =
341                        if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
342                            order
343                        } else {
344                            panic!(
345                                "Cannot find OTO child order for client_order_id: {client_order_id}"
346                            );
347                        };
348
349                    if !self.should_manage_order(&child_order) {
350                        continue;
351                    }
352
353                    if child_order.position_id().is_none() {
354                        child_order.set_position_id(position_id);
355                    }
356
357                    if parent_filled_qty != child_order.leaves_qty() {
358                        self.modify_order_quantity(&mut child_order, parent_filled_qty);
359                    }
360
361                    // if self.submit_order_handler.is_none() {
362                    //     return;
363                    // }
364
365                    if !self
366                        .submit_order_commands
367                        .contains_key(&child_order.client_order_id())
368                        && let Err(e) =
369                            self.create_new_submit_order(&child_order, position_id, client_id)
370                    {
371                        log::error!("Failed to create new submit order: {e}");
372                    }
373                }
374            }
375            Some(ContingencyType::Oco) => {
376                let linked_orders = if let Some(orders) = order.linked_order_ids() {
377                    orders
378                } else {
379                    log::error!("No linked orders found for OCO order");
380                    return;
381                };
382
383                for client_order_id in linked_orders {
384                    let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
385                    {
386                        Some(contingent_order) => contingent_order,
387                        None => {
388                            panic!(
389                                "Cannot find OCO contingent order for client_order_id: {client_order_id}"
390                            );
391                        }
392                    };
393
394                    // Not being managed || Already completed
395                    if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
396                    {
397                        continue;
398                    }
399                    if contingent_order.client_order_id() != order.client_order_id() {
400                        self.cancel_order(&contingent_order);
401                    }
402                }
403            }
404            Some(ContingencyType::Ouo) => self.handle_contingencies(order),
405            _ => {}
406        }
407    }
408
409    /// # Panics
410    ///
411    /// Panics if a contingent order cannot be found for the given client order ID.
412    pub fn handle_contingencies(&mut self, order: OrderAny) {
413        let (filled_qty, leaves_qty, is_spawn_active) =
414            if let Some(exec_spawn_id) = order.exec_spawn_id() {
415                if let (Some(filled), Some(leaves)) = (
416                    self.cache
417                        .borrow()
418                        .exec_spawn_total_filled_qty(&exec_spawn_id, true),
419                    self.cache
420                        .borrow()
421                        .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
422                ) {
423                    (filled, leaves, leaves.raw > 0)
424                } else {
425                    log::error!("Failed to get spawn quantities for {exec_spawn_id}");
426                    return;
427                }
428            } else {
429                (order.filled_qty(), order.leaves_qty(), false)
430            };
431
432        let linked_orders = if let Some(orders) = order.linked_order_ids() {
433            orders
434        } else {
435            log::error!("No linked orders found");
436            return;
437        };
438
439        for client_order_id in linked_orders {
440            let mut contingent_order =
441                if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
442                    order
443                } else {
444                    panic!("Cannot find contingent order for client_order_id: {client_order_id}");
445                };
446
447            if !self.should_manage_order(&contingent_order)
448                || client_order_id == &order.client_order_id()
449            {
450                continue;
451            }
452
453            if contingent_order.is_closed() {
454                self.submit_order_commands.remove(&order.client_order_id());
455                continue;
456            }
457
458            match order.contingency_type() {
459                Some(ContingencyType::Oto) => {
460                    if order.is_closed()
461                        && filled_qty.raw == 0
462                        && (order.exec_spawn_id().is_none() || !is_spawn_active)
463                    {
464                        self.cancel_order(&contingent_order);
465                    } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
466                        self.modify_order_quantity(&mut contingent_order, filled_qty);
467                    }
468                }
469                Some(ContingencyType::Oco) => {
470                    if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
471                        self.cancel_order(&contingent_order);
472                    }
473                }
474                Some(ContingencyType::Ouo) => {
475                    if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
476                        || (order.is_closed()
477                            && (order.exec_spawn_id().is_none() || !is_spawn_active))
478                    {
479                        self.cancel_order(&contingent_order);
480                    } else if leaves_qty != contingent_order.leaves_qty() {
481                        self.modify_order_quantity(&mut contingent_order, leaves_qty);
482                    }
483                }
484                _ => {}
485            }
486        }
487    }
488
489    /// # Panics
490    ///
491    /// Panics if an OCO contingent order cannot be found for the given client order ID.
492    pub fn handle_contingencies_update(&mut self, order: OrderAny) {
493        let quantity = match order.exec_spawn_id() {
494            Some(exec_spawn_id) => {
495                if let Some(qty) = self
496                    .cache
497                    .borrow()
498                    .exec_spawn_total_quantity(&exec_spawn_id, true)
499                {
500                    qty
501                } else {
502                    log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
503                    return;
504                }
505            }
506            None => order.quantity(),
507        };
508
509        if quantity.raw == 0 {
510            return;
511        }
512
513        let linked_orders = if let Some(orders) = order.linked_order_ids() {
514            orders
515        } else {
516            log::error!("No linked orders found for contingent order");
517            return;
518        };
519
520        for client_order_id in linked_orders {
521            let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
522                Some(contingent_order) => contingent_order,
523                None => panic!(
524                    "Cannot find OCO contingent order for client_order_id: {client_order_id}"
525                ),
526            };
527
528            if !self.should_manage_order(&contingent_order)
529                || client_order_id == &order.client_order_id()
530                || contingent_order.is_closed()
531            {
532                continue;
533            }
534
535            if let Some(contingency_type) = order.contingency_type()
536                && matches!(
537                    contingency_type,
538                    ContingencyType::Oto | ContingencyType::Oco
539                )
540                && quantity != contingent_order.quantity()
541            {
542                self.modify_order_quantity(&mut contingent_order, quantity);
543            }
544        }
545    }
546
547    // Message sending methods
548    pub fn send_emulator_command(&self, command: TradingCommand) {
549        log_cmd_send(&command);
550        msgbus::send_any("OrderEmulator.execute".into(), &command);
551    }
552
553    pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
554        let id = command.strategy_id;
555        log::info!("{id} {CMD}{SEND} {command}");
556
557        let endpoint = format!("{exec_algorithm_id}.execute");
558        msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
559    }
560
561    pub fn send_risk_command(&self, command: TradingCommand) {
562        log_cmd_send(&command);
563        msgbus::send_any("RiskEngine.execute".into(), &command);
564    }
565
566    pub fn send_exec_command(&self, command: TradingCommand) {
567        log_cmd_send(&command);
568        msgbus::send_any("ExecEngine.execute".into(), &command);
569    }
570
571    pub fn send_risk_event(&self, event: OrderEventAny) {
572        log_evt_send(&event);
573        msgbus::send_any("RiskEngine.process".into(), &event);
574    }
575
576    pub fn send_exec_event(&self, event: OrderEventAny) {
577        log_evt_send(&event);
578        msgbus::send_any("ExecEngine.process".into(), &event);
579    }
580}
581
582#[inline(always)]
583fn log_cmd_send(command: &TradingCommand) {
584    if let Some(id) = command.strategy_id() {
585        log::info!("{id} {CMD}{SEND} {command}");
586    } else {
587        log::info!("{CMD}{SEND} {command}");
588    }
589}
590
591#[inline(always)]
592fn log_evt_send(event: &OrderEventAny) {
593    let id = event.strategy_id();
594    log::info!("{id} {EVT}{SEND} {event}");
595}
596
597#[cfg(test)]
598mod tests {
599    use nautilus_core::UUID4;
600    use nautilus_model::{
601        events::{OrderAccepted, OrderSubmitted},
602        identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
603    };
604    use rstest::rstest;
605
606    use super::*;
607
608    /// Verifies unhandled order events are no-ops and don't panic.
609    /// Previously, unhandled events would hit a todo!() panic.
610    #[rstest]
611    fn test_handle_event_unhandled_events_are_noop() {
612        let submitted = OrderEventAny::Submitted(OrderSubmitted {
613            trader_id: TraderId::from("TRADER-001"),
614            strategy_id: StrategyId::from("STRATEGY-001"),
615            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
616            client_order_id: ClientOrderId::from("O-001"),
617            account_id: AccountId::from("ACCOUNT-001"),
618            event_id: UUID4::new(),
619            ts_event: Default::default(),
620            ts_init: Default::default(),
621        });
622        let accepted = OrderEventAny::Accepted(OrderAccepted {
623            trader_id: TraderId::from("TRADER-001"),
624            strategy_id: StrategyId::from("STRATEGY-001"),
625            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
626            client_order_id: ClientOrderId::from("O-001"),
627            venue_order_id: VenueOrderId::from("V-001"),
628            account_id: AccountId::from("ACCOUNT-001"),
629            event_id: UUID4::new(),
630            ts_event: Default::default(),
631            ts_init: Default::default(),
632            reconciliation: 0,
633        });
634
635        match submitted {
636            OrderEventAny::Rejected(_) => panic!("Should not match"),
637            OrderEventAny::Canceled(_) => panic!("Should not match"),
638            OrderEventAny::Expired(_) => panic!("Should not match"),
639            OrderEventAny::Updated(_) => panic!("Should not match"),
640            OrderEventAny::Filled(_) => panic!("Should not match"),
641            _ => {}
642        }
643        match accepted {
644            OrderEventAny::Rejected(_) => panic!("Should not match"),
645            OrderEventAny::Canceled(_) => panic!("Should not match"),
646            OrderEventAny::Expired(_) => panic!("Should not match"),
647            OrderEventAny::Updated(_) => panic!("Should not match"),
648            OrderEventAny::Filled(_) => panic!("Should not match"),
649            _ => {}
650        }
651    }
652}