nautilus_execution/order_manager/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
21
22use nautilus_common::{
23    cache::Cache,
24    clock::Clock,
25    logging::{CMD, EVT, SEND},
26    messages::execution::{SubmitOrder, TradingCommand},
27    msgbus,
28};
29use nautilus_core::UUID4;
30use nautilus_model::{
31    enums::{ContingencyType, TriggerType},
32    events::{
33        OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
34    },
35    identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
36    orders::{Order, OrderAny},
37    types::Quantity,
38};
39
40/// Manages the lifecycle and state of orders with contingency handling.
41///
42/// The order manager is responsible for managing local order state, handling
43/// contingent orders (OTO, OCO, OUO), and coordinating with emulation and
44/// execution systems. It tracks order commands and manages complex order
45/// relationships for advanced order types.
46pub struct OrderManager {
47    clock: Rc<RefCell<dyn Clock>>,
48    cache: Rc<RefCell<Cache>>,
49    active_local: bool,
50    // submit_order_handler: Option<SubmitOrderHandlerAny>,
51    // cancel_order_handler: Option<CancelOrderHandlerAny>,
52    // modify_order_handler: Option<ModifyOrderHandlerAny>,
53    submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
54}
55
56impl Debug for OrderManager {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct(stringify!(OrderManager))
59            .field("pending_commands", &self.submit_order_commands.len())
60            .finish()
61    }
62}
63
64impl OrderManager {
65    /// Creates a new [`OrderManager`] instance.
66    pub fn new(
67        clock: Rc<RefCell<dyn Clock>>,
68        cache: Rc<RefCell<Cache>>,
69        active_local: bool,
70        // submit_order_handler: Option<SubmitOrderHandlerAny>,
71        // cancel_order_handler: Option<CancelOrderHandlerAny>,
72        // modify_order_handler: Option<ModifyOrderHandlerAny>,
73    ) -> Self {
74        Self {
75            clock,
76            cache,
77            active_local,
78            // submit_order_handler,
79            // cancel_order_handler,
80            // modify_order_handler,
81            submit_order_commands: HashMap::new(),
82        }
83    }
84
85    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
86    //     self.submit_order_handler = Some(handler);
87    // }
88    //
89    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
90    //     self.cancel_order_handler = Some(handler);
91    // }
92    //
93    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
94    //     self.modify_order_handler = Some(handler);
95    // }
96
97    #[must_use]
98    /// Returns a copy of all cached submit order commands.
99    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
100        self.submit_order_commands.clone()
101    }
102
103    /// Caches a submit order command for later processing.
104    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
105        self.submit_order_commands
106            .insert(command.order.client_order_id(), command);
107    }
108
109    /// Removes and returns a cached submit order command.
110    pub fn pop_submit_order_command(
111        &mut self,
112        client_order_id: ClientOrderId,
113    ) -> Option<SubmitOrder> {
114        self.submit_order_commands.remove(&client_order_id)
115    }
116
117    /// Resets the order manager by clearing all cached commands.
118    pub fn reset(&mut self) {
119        self.submit_order_commands.clear();
120    }
121
122    /// Cancels an order if it's not already pending cancellation or closed.
123    pub fn cancel_order(&mut self, order: &OrderAny) {
124        if self
125            .cache
126            .borrow()
127            .is_order_pending_cancel_local(&order.client_order_id())
128        {
129            return;
130        }
131
132        if order.is_closed() {
133            log::warn!("Cannot cancel order: already closed");
134            return;
135        }
136
137        self.submit_order_commands.remove(&order.client_order_id());
138
139        // if let Some(handler) = &self.cancel_order_handler {
140        //     handler.handle_cancel_order(order);
141        // }
142    }
143
144    /// Modifies the quantity of an existing order.
145    pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
146        // if let Some(handler) = &self.modify_order_handler {
147        //     handler.handle_modify_order(order, new_quantity);
148        // }
149    }
150
151    /// # Errors
152    ///
153    /// Returns an error if creating a new submit order fails.
154    pub fn create_new_submit_order(
155        &mut self,
156        order: &OrderAny,
157        position_id: Option<PositionId>,
158        client_id: Option<ClientId>,
159    ) -> anyhow::Result<()> {
160        let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
161        let venue_order_id = order
162            .venue_order_id()
163            .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
164
165        let submit = SubmitOrder::new(
166            order.trader_id(),
167            client_id,
168            order.strategy_id(),
169            order.instrument_id(),
170            order.client_order_id(),
171            venue_order_id,
172            order.clone(),
173            order.exec_algorithm_id(),
174            position_id,
175            None, // params
176            UUID4::new(),
177            self.clock.borrow().timestamp_ns(),
178        )?;
179
180        if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
181            self.cache_submit_order_command(submit.clone());
182
183            match order.exec_algorithm_id() {
184                Some(exec_algorithm_id) => {
185                    self.send_algo_command(submit, exec_algorithm_id);
186                }
187                None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
188            }
189        } // else if let Some(handler) = &self.submit_order_handler {
190        //     handler.handle_submit_order(submit);
191        // }
192
193        Ok(())
194    }
195
196    #[must_use]
197    /// Returns true if the order manager should manage the given order.
198    pub fn should_manage_order(&self, order: &OrderAny) -> bool {
199        self.active_local && order.is_active_local()
200    }
201
202    // Event Handlers
203    /// Handles an order event by routing it to the appropriate handler method.
204    pub fn handle_event(&mut self, event: OrderEventAny) {
205        match event {
206            OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
207            OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
208            OrderEventAny::Expired(event) => self.handle_order_expired(event),
209            OrderEventAny::Updated(event) => self.handle_order_updated(event),
210            OrderEventAny::Filled(event) => self.handle_order_filled(event),
211            _ => self.handle_position_event(event),
212        }
213    }
214
215    /// Handles an order rejected event and manages any contingent orders.
216    pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
217        let cloned_order = self
218            .cache
219            .borrow()
220            .order(&rejected.client_order_id)
221            .cloned();
222        if let Some(order) = cloned_order {
223            if order.contingency_type() != Some(ContingencyType::NoContingency) {
224                self.handle_contingencies(order);
225            }
226        } else {
227            log::error!(
228                "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
229                rejected.client_order_id,
230                rejected
231            );
232        }
233    }
234
235    pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
236        let cloned_order = self
237            .cache
238            .borrow()
239            .order(&canceled.client_order_id)
240            .cloned();
241        if let Some(order) = cloned_order {
242            if order.contingency_type() != Some(ContingencyType::NoContingency) {
243                self.handle_contingencies(order);
244            }
245        } else {
246            log::error!(
247                "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
248                canceled.client_order_id,
249                canceled
250            );
251        }
252    }
253
254    pub fn handle_order_expired(&mut self, expired: OrderExpired) {
255        let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
256        if let Some(order) = cloned_order {
257            if order.contingency_type() != Some(ContingencyType::NoContingency) {
258                self.handle_contingencies(order);
259            }
260        } else {
261            log::error!(
262                "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
263                expired.client_order_id,
264                expired
265            );
266        }
267    }
268
269    pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
270        let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
271        if let Some(order) = cloned_order {
272            if order.contingency_type() != Some(ContingencyType::NoContingency) {
273                self.handle_contingencies_update(order);
274            }
275        } else {
276            log::error!(
277                "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
278                updated.client_order_id,
279                updated
280            );
281        }
282    }
283
284    /// # Panics
285    ///
286    /// Panics if the OTO child order cannot be found for the given client order ID.
287    pub fn handle_order_filled(&mut self, filled: OrderFilled) {
288        let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
289        {
290            order
291        } else {
292            log::error!(
293                "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
294                filled.client_order_id,
295                filled
296            );
297            return;
298        };
299
300        match order.contingency_type() {
301            Some(ContingencyType::Oto) => {
302                let position_id = self
303                    .cache
304                    .borrow()
305                    .position_id(&order.client_order_id())
306                    .copied();
307                let client_id = self
308                    .cache
309                    .borrow()
310                    .client_id(&order.client_order_id())
311                    .copied();
312
313                let parent_filled_qty = match order.exec_spawn_id() {
314                    Some(spawn_id) => {
315                        if let Some(qty) = self
316                            .cache
317                            .borrow()
318                            .exec_spawn_total_filled_qty(&spawn_id, true)
319                        {
320                            qty
321                        } else {
322                            log::error!("Failed to get spawn filled quantity for {spawn_id}");
323                            return;
324                        }
325                    }
326                    None => order.filled_qty(),
327                };
328
329                let linked_orders = if let Some(orders) = order.linked_order_ids() {
330                    orders
331                } else {
332                    log::error!("No linked orders found for OTO order");
333                    return;
334                };
335
336                for client_order_id in linked_orders {
337                    let mut child_order =
338                        if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
339                            order
340                        } else {
341                            panic!(
342                                "Cannot find OTO child order for client_order_id: {client_order_id}"
343                            );
344                        };
345
346                    if !self.should_manage_order(&child_order) {
347                        continue;
348                    }
349
350                    if child_order.position_id().is_none() {
351                        child_order.set_position_id(position_id);
352                    }
353
354                    if parent_filled_qty != child_order.leaves_qty() {
355                        self.modify_order_quantity(&mut child_order, parent_filled_qty);
356                    }
357
358                    // if self.submit_order_handler.is_none() {
359                    //     return;
360                    // }
361
362                    if !self
363                        .submit_order_commands
364                        .contains_key(&child_order.client_order_id())
365                        && let Err(e) =
366                            self.create_new_submit_order(&child_order, position_id, client_id)
367                    {
368                        log::error!("Failed to create new submit order: {e}");
369                    }
370                }
371            }
372            Some(ContingencyType::Oco) => {
373                let linked_orders = if let Some(orders) = order.linked_order_ids() {
374                    orders
375                } else {
376                    log::error!("No linked orders found for OCO order");
377                    return;
378                };
379
380                for client_order_id in linked_orders {
381                    let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
382                    {
383                        Some(contingent_order) => contingent_order,
384                        None => {
385                            panic!(
386                                "Cannot find OCO contingent order for client_order_id: {client_order_id}"
387                            );
388                        }
389                    };
390
391                    // Not being managed || Already completed
392                    if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
393                    {
394                        continue;
395                    }
396                    if contingent_order.client_order_id() != order.client_order_id() {
397                        self.cancel_order(&contingent_order);
398                    }
399                }
400            }
401            Some(ContingencyType::Ouo) => self.handle_contingencies(order),
402            _ => {}
403        }
404    }
405
406    /// # Panics
407    ///
408    /// Panics if a contingent order cannot be found for the given client order ID.
409    pub fn handle_contingencies(&mut self, order: OrderAny) {
410        let (filled_qty, leaves_qty, is_spawn_active) =
411            if let Some(exec_spawn_id) = order.exec_spawn_id() {
412                if let (Some(filled), Some(leaves)) = (
413                    self.cache
414                        .borrow()
415                        .exec_spawn_total_filled_qty(&exec_spawn_id, true),
416                    self.cache
417                        .borrow()
418                        .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
419                ) {
420                    (filled, leaves, leaves.raw > 0)
421                } else {
422                    log::error!("Failed to get spawn quantities for {exec_spawn_id}");
423                    return;
424                }
425            } else {
426                (order.filled_qty(), order.leaves_qty(), false)
427            };
428
429        let linked_orders = if let Some(orders) = order.linked_order_ids() {
430            orders
431        } else {
432            log::error!("No linked orders found");
433            return;
434        };
435
436        for client_order_id in linked_orders {
437            let mut contingent_order =
438                if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
439                    order
440                } else {
441                    panic!("Cannot find contingent order for client_order_id: {client_order_id}");
442                };
443
444            if !self.should_manage_order(&contingent_order)
445                || client_order_id == &order.client_order_id()
446            {
447                continue;
448            }
449
450            if contingent_order.is_closed() {
451                self.submit_order_commands.remove(&order.client_order_id());
452                continue;
453            }
454
455            match order.contingency_type() {
456                Some(ContingencyType::Oto) => {
457                    if order.is_closed()
458                        && filled_qty.raw == 0
459                        && (order.exec_spawn_id().is_none() || !is_spawn_active)
460                    {
461                        self.cancel_order(&contingent_order);
462                    } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
463                        self.modify_order_quantity(&mut contingent_order, filled_qty);
464                    }
465                }
466                Some(ContingencyType::Oco) => {
467                    if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
468                        self.cancel_order(&contingent_order);
469                    }
470                }
471                Some(ContingencyType::Ouo) => {
472                    if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
473                        || (order.is_closed()
474                            && (order.exec_spawn_id().is_none() || !is_spawn_active))
475                    {
476                        self.cancel_order(&contingent_order);
477                    } else if leaves_qty != contingent_order.leaves_qty() {
478                        self.modify_order_quantity(&mut contingent_order, leaves_qty);
479                    }
480                }
481                _ => {}
482            }
483        }
484    }
485
486    /// # Panics
487    ///
488    /// Panics if an OCO contingent order cannot be found for the given client order ID.
489    pub fn handle_contingencies_update(&mut self, order: OrderAny) {
490        let quantity = match order.exec_spawn_id() {
491            Some(exec_spawn_id) => {
492                if let Some(qty) = self
493                    .cache
494                    .borrow()
495                    .exec_spawn_total_quantity(&exec_spawn_id, true)
496                {
497                    qty
498                } else {
499                    log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
500                    return;
501                }
502            }
503            None => order.quantity(),
504        };
505
506        if quantity.raw == 0 {
507            return;
508        }
509
510        let linked_orders = if let Some(orders) = order.linked_order_ids() {
511            orders
512        } else {
513            log::error!("No linked orders found for contingent order");
514            return;
515        };
516
517        for client_order_id in linked_orders {
518            let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
519                Some(contingent_order) => contingent_order,
520                None => panic!(
521                    "Cannot find OCO contingent order for client_order_id: {client_order_id}"
522                ),
523            };
524
525            if !self.should_manage_order(&contingent_order)
526                || client_order_id == &order.client_order_id()
527                || contingent_order.is_closed()
528            {
529                continue;
530            }
531
532            if let Some(contingency_type) = order.contingency_type()
533                && matches!(
534                    contingency_type,
535                    ContingencyType::Oto | ContingencyType::Oco
536                )
537                && quantity != contingent_order.quantity()
538            {
539                self.modify_order_quantity(&mut contingent_order, quantity);
540            }
541        }
542    }
543
544    pub fn handle_position_event(&mut self, _event: OrderEventAny) {
545        todo!()
546    }
547
548    // Message sending methods
549    pub fn send_emulator_command(&self, command: TradingCommand) {
550        log::info!("{CMD}{SEND} {command}");
551
552        msgbus::send_any("OrderEmulator.execute".into(), &command);
553    }
554
555    pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
556        log::info!("{CMD}{SEND} {command}");
557
558        let endpoint = format!("{exec_algorithm_id}.execute");
559        msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
560    }
561
562    pub fn send_risk_command(&self, command: TradingCommand) {
563        log::info!("{CMD}{SEND} {command}");
564        msgbus::send_any("RiskEngine.execute".into(), &command);
565    }
566
567    pub fn send_exec_command(&self, command: TradingCommand) {
568        log::info!("{CMD}{SEND} {command}");
569        msgbus::send_any("ExecEngine.execute".into(), &command);
570    }
571
572    pub fn send_risk_event(&self, event: OrderEventAny) {
573        log::info!("{EVT}{SEND} {event}");
574        msgbus::send_any("RiskEngine.process".into(), &event);
575    }
576
577    pub fn send_exec_event(&self, event: OrderEventAny) {
578        log::info!("{EVT}{SEND} {event}");
579        msgbus::send_any("ExecEngine.process".into(), &event);
580    }
581}