nautilus_execution/order_manager/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::RefCell, collections::HashMap, rc::Rc};
17
18use nautilus_common::{
19    cache::Cache,
20    clock::Clock,
21    logging::{CMD, EVT, SENT},
22    msgbus::MessageBus,
23};
24use nautilus_core::UUID4;
25use nautilus_model::{
26    enums::{ContingencyType, TriggerType},
27    events::{
28        OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
29    },
30    identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
31    orders::OrderAny,
32    types::Quantity,
33};
34use ustr::Ustr;
35
36use crate::messages::{
37    SubmitOrder, TradingCommand,
38    cancel::{CancelOrderHandler, CancelOrderHandlerAny},
39    modify::{ModifyOrderHandler, ModifyOrderHandlerAny},
40    submit::{SubmitOrderHandler, SubmitOrderHandlerAny},
41};
42
43pub struct OrderManager {
44    clock: Rc<RefCell<dyn Clock>>,
45    cache: Rc<RefCell<Cache>>,
46    msgbus: Rc<RefCell<MessageBus>>,
47    active_local: bool,
48    submit_order_handler: Option<SubmitOrderHandlerAny>,
49    cancel_order_handler: Option<CancelOrderHandlerAny>,
50    modify_order_handler: Option<ModifyOrderHandlerAny>,
51    submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
52}
53
54impl OrderManager {
55    pub fn new(
56        clock: Rc<RefCell<dyn Clock>>,
57        msgbus: Rc<RefCell<MessageBus>>,
58        cache: Rc<RefCell<Cache>>,
59        active_local: bool,
60        submit_order_handler: Option<SubmitOrderHandlerAny>,
61        cancel_order_handler: Option<CancelOrderHandlerAny>,
62        modify_order_handler: Option<ModifyOrderHandlerAny>,
63    ) -> Self {
64        Self {
65            clock,
66            cache,
67            msgbus,
68            active_local,
69            submit_order_handler,
70            cancel_order_handler,
71            modify_order_handler,
72            submit_order_commands: HashMap::new(),
73        }
74    }
75
76    pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
77        self.submit_order_handler = Some(handler);
78    }
79
80    pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
81        self.cancel_order_handler = Some(handler);
82    }
83
84    pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
85        self.modify_order_handler = Some(handler);
86    }
87
88    #[must_use]
89    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
90        self.submit_order_commands.clone()
91    }
92
93    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
94        self.submit_order_commands
95            .insert(command.order.client_order_id(), command);
96    }
97
98    pub fn pop_submit_order_command(
99        &mut self,
100        client_order_id: ClientOrderId,
101    ) -> Option<SubmitOrder> {
102        self.submit_order_commands.remove(&client_order_id)
103    }
104
105    pub fn reset(&mut self) {
106        self.submit_order_commands.clear();
107    }
108
109    pub fn cancel_order(&mut self, order: &OrderAny) {
110        if self
111            .cache
112            .borrow()
113            .is_order_pending_cancel_local(&order.client_order_id())
114        {
115            return;
116        }
117
118        if order.is_closed() {
119            log::warn!("Cannot cancel order: already closed");
120            return;
121        }
122
123        self.submit_order_commands.remove(&order.client_order_id());
124
125        if let Some(handler) = &self.cancel_order_handler {
126            handler.handle_cancel_order(order);
127        }
128    }
129
130    pub fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
131        if let Some(handler) = &self.modify_order_handler {
132            handler.handle_modify_order(order, new_quantity);
133        }
134    }
135
136    pub fn create_new_submit_order(
137        &mut self,
138        order: &OrderAny,
139        position_id: Option<PositionId>,
140        client_id: Option<ClientId>,
141    ) -> anyhow::Result<()> {
142        let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
143        let venue_order_id = order
144            .venue_order_id()
145            .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
146
147        let submit = SubmitOrder::new(
148            order.trader_id(),
149            client_id,
150            order.strategy_id(),
151            order.instrument_id(),
152            order.client_order_id(),
153            venue_order_id,
154            order.clone(),
155            order.exec_algorithm_id(),
156            position_id,
157            UUID4::new(),
158            self.clock.borrow().timestamp_ns(),
159        )?;
160
161        if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
162            self.cache_submit_order_command(submit.clone());
163
164            match order.exec_algorithm_id() {
165                Some(exec_algorithm_id) => {
166                    self.send_algo_command(submit, exec_algorithm_id);
167                }
168                None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
169            }
170        } else if let Some(handler) = &self.submit_order_handler {
171            handler.handle_submit_order(submit);
172        }
173
174        Ok(())
175    }
176
177    #[must_use]
178    pub fn should_manage_order(&self, order: &OrderAny) -> bool {
179        self.active_local && order.is_active_local()
180    }
181
182    // Event Handlers
183    pub fn handle_event(&mut self, event: OrderEventAny) {
184        match event {
185            OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
186            OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
187            OrderEventAny::Expired(event) => self.handle_order_expired(event),
188            OrderEventAny::Updated(event) => self.handle_order_updated(event),
189            OrderEventAny::Filled(event) => self.handle_order_filled(event),
190            _ => self.handle_position_event(event),
191        }
192    }
193
194    pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
195        let cloned_order = self
196            .cache
197            .borrow()
198            .order(&rejected.client_order_id)
199            .cloned();
200        if let Some(order) = cloned_order {
201            if order.contingency_type() != Some(ContingencyType::NoContingency) {
202                self.handle_contingencies(order);
203            }
204        } else {
205            log::error!(
206                "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
207                rejected.client_order_id,
208                rejected
209            );
210        }
211    }
212
213    pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
214        let cloned_order = self
215            .cache
216            .borrow()
217            .order(&canceled.client_order_id)
218            .cloned();
219        if let Some(order) = cloned_order {
220            if order.contingency_type() != Some(ContingencyType::NoContingency) {
221                self.handle_contingencies(order);
222            }
223        } else {
224            log::error!(
225                "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
226                canceled.client_order_id,
227                canceled
228            );
229        }
230    }
231
232    pub fn handle_order_expired(&mut self, expired: OrderExpired) {
233        let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
234        if let Some(order) = cloned_order {
235            if order.contingency_type() != Some(ContingencyType::NoContingency) {
236                self.handle_contingencies(order);
237            }
238        } else {
239            log::error!(
240                "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
241                expired.client_order_id,
242                expired
243            );
244        }
245    }
246
247    pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
248        let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
249        if let Some(order) = cloned_order {
250            if order.contingency_type() != Some(ContingencyType::NoContingency) {
251                self.handle_contingencies_update(order);
252            }
253        } else {
254            log::error!(
255                "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
256                updated.client_order_id,
257                updated
258            );
259        }
260    }
261
262    pub fn handle_order_filled(&mut self, filled: OrderFilled) {
263        let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
264        {
265            order
266        } else {
267            log::error!(
268                "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
269                filled.client_order_id,
270                filled
271            );
272            return;
273        };
274
275        match order.contingency_type() {
276            Some(ContingencyType::Oto) => {
277                let position_id = self
278                    .cache
279                    .borrow()
280                    .position_id(&order.client_order_id())
281                    .copied();
282                let client_id = self
283                    .cache
284                    .borrow()
285                    .client_id(&order.client_order_id())
286                    .copied();
287
288                let parent_filled_qty = match order.exec_spawn_id() {
289                    Some(spawn_id) => {
290                        if let Some(qty) = self
291                            .cache
292                            .borrow()
293                            .exec_spawn_total_filled_qty(&spawn_id, true)
294                        {
295                            qty
296                        } else {
297                            log::error!("Failed to get spawn filled quantity for {spawn_id}");
298                            return;
299                        }
300                    }
301                    None => order.filled_qty(),
302                };
303
304                let linked_orders = if let Some(orders) = order.linked_order_ids() {
305                    orders
306                } else {
307                    log::error!("No linked orders found for OTO order");
308                    return;
309                };
310
311                for client_order_id in linked_orders {
312                    let mut child_order =
313                        if let Some(order) = self.cache.borrow().order(&client_order_id).cloned() {
314                            order
315                        } else {
316                            panic!(
317                                "Cannot find OTO child order for client_order_id: {client_order_id}"
318                            );
319                        };
320
321                    if !self.should_manage_order(&child_order) {
322                        continue;
323                    }
324
325                    if child_order.position_id().is_none() {
326                        child_order.set_position_id(position_id);
327                    }
328
329                    if parent_filled_qty != child_order.leaves_qty() {
330                        self.modify_order_quantity(&mut child_order, parent_filled_qty);
331                    }
332
333                    if self.submit_order_handler.is_none() {
334                        return;
335                    }
336
337                    if !self
338                        .submit_order_commands
339                        .contains_key(&child_order.client_order_id())
340                    {
341                        if let Err(e) =
342                            self.create_new_submit_order(&child_order, position_id, client_id)
343                        {
344                            log::error!("Failed to create new submit order: {e}");
345                        }
346                    }
347                }
348            }
349            Some(ContingencyType::Oco) => {
350                let linked_orders = if let Some(orders) = order.linked_order_ids() {
351                    orders
352                } else {
353                    log::error!("No linked orders found for OCO order");
354                    return;
355                };
356
357                for client_order_id in linked_orders {
358                    let contingent_order = match self
359                        .cache
360                        .borrow()
361                        .order(&client_order_id)
362                        .cloned()
363                    {
364                        Some(contingent_order) => contingent_order,
365                        None => {
366                            panic!(
367                                "Cannot find OCO contingent order for client_order_id: {client_order_id}"
368                            );
369                        }
370                    };
371
372                    // Not being managed || Already completed
373                    if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
374                    {
375                        continue;
376                    }
377                    if contingent_order.client_order_id() != order.client_order_id() {
378                        self.cancel_order(&contingent_order);
379                    }
380                }
381            }
382            Some(ContingencyType::Ouo) => self.handle_contingencies(order),
383            _ => {}
384        }
385    }
386
387    pub fn handle_contingencies(&mut self, order: OrderAny) {
388        let (filled_qty, leaves_qty, is_spawn_active) =
389            if let Some(exec_spawn_id) = order.exec_spawn_id() {
390                if let (Some(filled), Some(leaves)) = (
391                    self.cache
392                        .borrow()
393                        .exec_spawn_total_filled_qty(&exec_spawn_id, true),
394                    self.cache
395                        .borrow()
396                        .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
397                ) {
398                    (filled, leaves, leaves.raw > 0)
399                } else {
400                    log::error!("Failed to get spawn quantities for {exec_spawn_id}");
401                    return;
402                }
403            } else {
404                (order.filled_qty(), order.leaves_qty(), false)
405            };
406
407        let linked_orders = if let Some(orders) = order.linked_order_ids() {
408            orders
409        } else {
410            log::error!("No linked orders found");
411            return;
412        };
413
414        for client_order_id in linked_orders {
415            let mut contingent_order =
416                if let Some(order) = self.cache.borrow().order(&client_order_id).cloned() {
417                    order
418                } else {
419                    panic!("Cannot find contingent order for client_order_id: {client_order_id}");
420                };
421
422            if !self.should_manage_order(&contingent_order)
423                || client_order_id == order.client_order_id()
424            {
425                continue;
426            }
427
428            if contingent_order.is_closed() {
429                self.submit_order_commands.remove(&order.client_order_id());
430                continue;
431            }
432
433            match order.contingency_type() {
434                Some(ContingencyType::Oto) => {
435                    if order.is_closed()
436                        && filled_qty.raw == 0
437                        && (order.exec_spawn_id().is_none() || !is_spawn_active)
438                    {
439                        self.cancel_order(&contingent_order);
440                    } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
441                        self.modify_order_quantity(&mut contingent_order, filled_qty);
442                    }
443                }
444                Some(ContingencyType::Oco) => {
445                    if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
446                        self.cancel_order(&contingent_order);
447                    }
448                }
449                Some(ContingencyType::Ouo) => {
450                    if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
451                        || (order.is_closed()
452                            && (order.exec_spawn_id().is_none() || !is_spawn_active))
453                    {
454                        self.cancel_order(&contingent_order);
455                    } else if leaves_qty != contingent_order.leaves_qty() {
456                        self.modify_order_quantity(&mut contingent_order, leaves_qty);
457                    }
458                }
459                _ => {}
460            }
461        }
462    }
463
464    pub fn handle_contingencies_update(&mut self, order: OrderAny) {
465        let quantity = match order.exec_spawn_id() {
466            Some(exec_spawn_id) => {
467                if let Some(qty) = self
468                    .cache
469                    .borrow()
470                    .exec_spawn_total_quantity(&exec_spawn_id, true)
471                {
472                    qty
473                } else {
474                    log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
475                    return;
476                }
477            }
478            None => order.quantity(),
479        };
480
481        if quantity.raw == 0 {
482            return;
483        }
484
485        let linked_orders = if let Some(orders) = order.linked_order_ids() {
486            orders
487        } else {
488            log::error!("No linked orders found for contingent order");
489            return;
490        };
491
492        for client_order_id in linked_orders {
493            let mut contingent_order = match self.cache.borrow().order(&client_order_id).cloned() {
494                Some(contingent_order) => contingent_order,
495                None => panic!(
496                    "Cannot find OCO contingent order for client_order_id: {client_order_id}"
497                ),
498            };
499
500            if !self.should_manage_order(&contingent_order)
501                || client_order_id == order.client_order_id()
502                || contingent_order.is_closed()
503            {
504                continue;
505            }
506
507            if let Some(contingency_type) = order.contingency_type() {
508                if matches!(
509                    contingency_type,
510                    ContingencyType::Oto | ContingencyType::Oco
511                ) && quantity != contingent_order.quantity()
512                {
513                    self.modify_order_quantity(&mut contingent_order, quantity);
514                }
515            }
516        }
517    }
518
519    pub fn handle_position_event(&mut self, _event: OrderEventAny) {
520        todo!()
521    }
522
523    // Message sending methods
524    pub fn send_emulator_command(&self, command: TradingCommand) {
525        log::info!("{CMD}{SENT} {command}");
526
527        self.msgbus
528            .borrow()
529            .send(&Ustr::from("OrderEmulator.execute"), &command);
530    }
531
532    pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
533        log::info!("{CMD}{SENT} {command}");
534
535        let endpoint = format!("{exec_algorithm_id}.execute");
536        self.msgbus.borrow().send(
537            &Ustr::from(&endpoint),
538            &TradingCommand::SubmitOrder(command),
539        );
540    }
541
542    pub fn send_risk_command(&self, command: TradingCommand) {
543        log::info!("{CMD}{SENT} {command}");
544
545        self.msgbus
546            .borrow()
547            .send(&Ustr::from("RiskEngine.execute"), &command);
548    }
549
550    pub fn send_exec_command(&self, command: TradingCommand) {
551        log::info!("{CMD}{SENT} {command}");
552
553        self.msgbus
554            .borrow()
555            .send(&Ustr::from("ExecEngine.execute"), &command);
556    }
557
558    pub fn send_risk_event(&self, event: OrderEventAny) {
559        log::info!("{}{} {}", EVT, SENT, event);
560        self.msgbus
561            .borrow()
562            .send(&Ustr::from("RiskEngine.process"), &event);
563    }
564
565    pub fn send_exec_event(&self, event: OrderEventAny) {
566        log::info!("{}{} {}", EVT, SENT, event);
567        self.msgbus
568            .borrow()
569            .send(&Ustr::from("ExecEngine.process"), &event);
570    }
571}