nautilus_execution/order_manager/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::RefCell, collections::HashMap, rc::Rc};
17
18use nautilus_common::{
19    cache::Cache,
20    clock::Clock,
21    logging::{CMD, EVT, SENT},
22    msgbus::{self},
23};
24use nautilus_core::UUID4;
25use nautilus_model::{
26    enums::{ContingencyType, TriggerType},
27    events::{
28        OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
29    },
30    identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
31    orders::{Order, OrderAny},
32    types::Quantity,
33};
34use ustr::Ustr;
35
36use crate::messages::{
37    SubmitOrder, TradingCommand,
38    cancel::{CancelOrderHandler, CancelOrderHandlerAny},
39    modify::{ModifyOrderHandler, ModifyOrderHandlerAny},
40    submit::{SubmitOrderHandler, SubmitOrderHandlerAny},
41};
42
43pub struct OrderManager {
44    clock: Rc<RefCell<dyn Clock>>,
45    cache: Rc<RefCell<Cache>>,
46    active_local: bool,
47    submit_order_handler: Option<SubmitOrderHandlerAny>,
48    cancel_order_handler: Option<CancelOrderHandlerAny>,
49    modify_order_handler: Option<ModifyOrderHandlerAny>,
50    submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
51}
52
53impl OrderManager {
54    pub fn new(
55        clock: Rc<RefCell<dyn Clock>>,
56        cache: Rc<RefCell<Cache>>,
57        active_local: bool,
58        submit_order_handler: Option<SubmitOrderHandlerAny>,
59        cancel_order_handler: Option<CancelOrderHandlerAny>,
60        modify_order_handler: Option<ModifyOrderHandlerAny>,
61    ) -> Self {
62        Self {
63            clock,
64            cache,
65            active_local,
66            submit_order_handler,
67            cancel_order_handler,
68            modify_order_handler,
69            submit_order_commands: HashMap::new(),
70        }
71    }
72
73    pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
74        self.submit_order_handler = Some(handler);
75    }
76
77    pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
78        self.cancel_order_handler = Some(handler);
79    }
80
81    pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
82        self.modify_order_handler = Some(handler);
83    }
84
85    #[must_use]
86    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
87        self.submit_order_commands.clone()
88    }
89
90    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
91        self.submit_order_commands
92            .insert(command.order.client_order_id(), command);
93    }
94
95    pub fn pop_submit_order_command(
96        &mut self,
97        client_order_id: ClientOrderId,
98    ) -> Option<SubmitOrder> {
99        self.submit_order_commands.remove(&client_order_id)
100    }
101
102    pub fn reset(&mut self) {
103        self.submit_order_commands.clear();
104    }
105
106    pub fn cancel_order(&mut self, order: &OrderAny) {
107        if self
108            .cache
109            .borrow()
110            .is_order_pending_cancel_local(&order.client_order_id())
111        {
112            return;
113        }
114
115        if order.is_closed() {
116            log::warn!("Cannot cancel order: already closed");
117            return;
118        }
119
120        self.submit_order_commands.remove(&order.client_order_id());
121
122        if let Some(handler) = &self.cancel_order_handler {
123            handler.handle_cancel_order(order);
124        }
125    }
126
127    pub fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
128        if let Some(handler) = &self.modify_order_handler {
129            handler.handle_modify_order(order, new_quantity);
130        }
131    }
132
133    pub fn create_new_submit_order(
134        &mut self,
135        order: &OrderAny,
136        position_id: Option<PositionId>,
137        client_id: Option<ClientId>,
138    ) -> anyhow::Result<()> {
139        let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
140        let venue_order_id = order
141            .venue_order_id()
142            .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
143
144        let submit = SubmitOrder::new(
145            order.trader_id(),
146            client_id,
147            order.strategy_id(),
148            order.instrument_id(),
149            order.client_order_id(),
150            venue_order_id,
151            order.clone(),
152            order.exec_algorithm_id(),
153            position_id,
154            UUID4::new(),
155            self.clock.borrow().timestamp_ns(),
156        )?;
157
158        if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
159            self.cache_submit_order_command(submit.clone());
160
161            match order.exec_algorithm_id() {
162                Some(exec_algorithm_id) => {
163                    self.send_algo_command(submit, exec_algorithm_id);
164                }
165                None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
166            }
167        } else if let Some(handler) = &self.submit_order_handler {
168            handler.handle_submit_order(submit);
169        }
170
171        Ok(())
172    }
173
174    #[must_use]
175    pub fn should_manage_order(&self, order: &OrderAny) -> bool {
176        self.active_local && order.is_active_local()
177    }
178
179    // Event Handlers
180    pub fn handle_event(&mut self, event: OrderEventAny) {
181        match event {
182            OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
183            OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
184            OrderEventAny::Expired(event) => self.handle_order_expired(event),
185            OrderEventAny::Updated(event) => self.handle_order_updated(event),
186            OrderEventAny::Filled(event) => self.handle_order_filled(event),
187            _ => self.handle_position_event(event),
188        }
189    }
190
191    pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
192        let cloned_order = self
193            .cache
194            .borrow()
195            .order(&rejected.client_order_id)
196            .cloned();
197        if let Some(order) = cloned_order {
198            if order.contingency_type() != Some(ContingencyType::NoContingency) {
199                self.handle_contingencies(order);
200            }
201        } else {
202            log::error!(
203                "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
204                rejected.client_order_id,
205                rejected
206            );
207        }
208    }
209
210    pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
211        let cloned_order = self
212            .cache
213            .borrow()
214            .order(&canceled.client_order_id)
215            .cloned();
216        if let Some(order) = cloned_order {
217            if order.contingency_type() != Some(ContingencyType::NoContingency) {
218                self.handle_contingencies(order);
219            }
220        } else {
221            log::error!(
222                "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
223                canceled.client_order_id,
224                canceled
225            );
226        }
227    }
228
229    pub fn handle_order_expired(&mut self, expired: OrderExpired) {
230        let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
231        if let Some(order) = cloned_order {
232            if order.contingency_type() != Some(ContingencyType::NoContingency) {
233                self.handle_contingencies(order);
234            }
235        } else {
236            log::error!(
237                "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
238                expired.client_order_id,
239                expired
240            );
241        }
242    }
243
244    pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
245        let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
246        if let Some(order) = cloned_order {
247            if order.contingency_type() != Some(ContingencyType::NoContingency) {
248                self.handle_contingencies_update(order);
249            }
250        } else {
251            log::error!(
252                "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
253                updated.client_order_id,
254                updated
255            );
256        }
257    }
258
259    pub fn handle_order_filled(&mut self, filled: OrderFilled) {
260        let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
261        {
262            order
263        } else {
264            log::error!(
265                "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
266                filled.client_order_id,
267                filled
268            );
269            return;
270        };
271
272        match order.contingency_type() {
273            Some(ContingencyType::Oto) => {
274                let position_id = self
275                    .cache
276                    .borrow()
277                    .position_id(&order.client_order_id())
278                    .copied();
279                let client_id = self
280                    .cache
281                    .borrow()
282                    .client_id(&order.client_order_id())
283                    .copied();
284
285                let parent_filled_qty = match order.exec_spawn_id() {
286                    Some(spawn_id) => {
287                        if let Some(qty) = self
288                            .cache
289                            .borrow()
290                            .exec_spawn_total_filled_qty(&spawn_id, true)
291                        {
292                            qty
293                        } else {
294                            log::error!("Failed to get spawn filled quantity for {spawn_id}");
295                            return;
296                        }
297                    }
298                    None => order.filled_qty(),
299                };
300
301                let linked_orders = if let Some(orders) = order.linked_order_ids() {
302                    orders
303                } else {
304                    log::error!("No linked orders found for OTO order");
305                    return;
306                };
307
308                for client_order_id in linked_orders {
309                    let mut child_order =
310                        if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
311                            order
312                        } else {
313                            panic!(
314                                "Cannot find OTO child order for client_order_id: {client_order_id}"
315                            );
316                        };
317
318                    if !self.should_manage_order(&child_order) {
319                        continue;
320                    }
321
322                    if child_order.position_id().is_none() {
323                        child_order.set_position_id(position_id);
324                    }
325
326                    if parent_filled_qty != child_order.leaves_qty() {
327                        self.modify_order_quantity(&mut child_order, parent_filled_qty);
328                    }
329
330                    if self.submit_order_handler.is_none() {
331                        return;
332                    }
333
334                    if !self
335                        .submit_order_commands
336                        .contains_key(&child_order.client_order_id())
337                    {
338                        if let Err(e) =
339                            self.create_new_submit_order(&child_order, position_id, client_id)
340                        {
341                            log::error!("Failed to create new submit order: {e}");
342                        }
343                    }
344                }
345            }
346            Some(ContingencyType::Oco) => {
347                let linked_orders = if let Some(orders) = order.linked_order_ids() {
348                    orders
349                } else {
350                    log::error!("No linked orders found for OCO order");
351                    return;
352                };
353
354                for client_order_id in linked_orders {
355                    let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
356                    {
357                        Some(contingent_order) => contingent_order,
358                        None => {
359                            panic!(
360                                "Cannot find OCO contingent order for client_order_id: {client_order_id}"
361                            );
362                        }
363                    };
364
365                    // Not being managed || Already completed
366                    if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
367                    {
368                        continue;
369                    }
370                    if contingent_order.client_order_id() != order.client_order_id() {
371                        self.cancel_order(&contingent_order);
372                    }
373                }
374            }
375            Some(ContingencyType::Ouo) => self.handle_contingencies(order),
376            _ => {}
377        }
378    }
379
380    pub fn handle_contingencies(&mut self, order: OrderAny) {
381        let (filled_qty, leaves_qty, is_spawn_active) =
382            if let Some(exec_spawn_id) = order.exec_spawn_id() {
383                if let (Some(filled), Some(leaves)) = (
384                    self.cache
385                        .borrow()
386                        .exec_spawn_total_filled_qty(&exec_spawn_id, true),
387                    self.cache
388                        .borrow()
389                        .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
390                ) {
391                    (filled, leaves, leaves.raw > 0)
392                } else {
393                    log::error!("Failed to get spawn quantities for {exec_spawn_id}");
394                    return;
395                }
396            } else {
397                (order.filled_qty(), order.leaves_qty(), false)
398            };
399
400        let linked_orders = if let Some(orders) = order.linked_order_ids() {
401            orders
402        } else {
403            log::error!("No linked orders found");
404            return;
405        };
406
407        for client_order_id in linked_orders {
408            let mut contingent_order =
409                if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
410                    order
411                } else {
412                    panic!("Cannot find contingent order for client_order_id: {client_order_id}");
413                };
414
415            if !self.should_manage_order(&contingent_order)
416                || client_order_id == &order.client_order_id()
417            {
418                continue;
419            }
420
421            if contingent_order.is_closed() {
422                self.submit_order_commands.remove(&order.client_order_id());
423                continue;
424            }
425
426            match order.contingency_type() {
427                Some(ContingencyType::Oto) => {
428                    if order.is_closed()
429                        && filled_qty.raw == 0
430                        && (order.exec_spawn_id().is_none() || !is_spawn_active)
431                    {
432                        self.cancel_order(&contingent_order);
433                    } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
434                        self.modify_order_quantity(&mut contingent_order, filled_qty);
435                    }
436                }
437                Some(ContingencyType::Oco) => {
438                    if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
439                        self.cancel_order(&contingent_order);
440                    }
441                }
442                Some(ContingencyType::Ouo) => {
443                    if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
444                        || (order.is_closed()
445                            && (order.exec_spawn_id().is_none() || !is_spawn_active))
446                    {
447                        self.cancel_order(&contingent_order);
448                    } else if leaves_qty != contingent_order.leaves_qty() {
449                        self.modify_order_quantity(&mut contingent_order, leaves_qty);
450                    }
451                }
452                _ => {}
453            }
454        }
455    }
456
457    pub fn handle_contingencies_update(&mut self, order: OrderAny) {
458        let quantity = match order.exec_spawn_id() {
459            Some(exec_spawn_id) => {
460                if let Some(qty) = self
461                    .cache
462                    .borrow()
463                    .exec_spawn_total_quantity(&exec_spawn_id, true)
464                {
465                    qty
466                } else {
467                    log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
468                    return;
469                }
470            }
471            None => order.quantity(),
472        };
473
474        if quantity.raw == 0 {
475            return;
476        }
477
478        let linked_orders = if let Some(orders) = order.linked_order_ids() {
479            orders
480        } else {
481            log::error!("No linked orders found for contingent order");
482            return;
483        };
484
485        for client_order_id in linked_orders {
486            let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
487                Some(contingent_order) => contingent_order,
488                None => panic!(
489                    "Cannot find OCO contingent order for client_order_id: {client_order_id}"
490                ),
491            };
492
493            if !self.should_manage_order(&contingent_order)
494                || client_order_id == &order.client_order_id()
495                || contingent_order.is_closed()
496            {
497                continue;
498            }
499
500            if let Some(contingency_type) = order.contingency_type() {
501                if matches!(
502                    contingency_type,
503                    ContingencyType::Oto | ContingencyType::Oco
504                ) && quantity != contingent_order.quantity()
505                {
506                    self.modify_order_quantity(&mut contingent_order, quantity);
507                }
508            }
509        }
510    }
511
512    pub fn handle_position_event(&mut self, _event: OrderEventAny) {
513        todo!()
514    }
515
516    // Message sending methods
517    pub fn send_emulator_command(&self, command: TradingCommand) {
518        log::info!("{CMD}{SENT} {command}");
519
520        msgbus::send(&Ustr::from("OrderEmulator.execute"), &command);
521    }
522
523    pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
524        log::info!("{CMD}{SENT} {command}");
525
526        let endpoint = format!("{exec_algorithm_id}.execute");
527        msgbus::send(
528            &Ustr::from(&endpoint),
529            &TradingCommand::SubmitOrder(command),
530        );
531    }
532
533    pub fn send_risk_command(&self, command: TradingCommand) {
534        log::info!("{CMD}{SENT} {command}");
535        msgbus::send(&Ustr::from("RiskEngine.execute"), &command);
536    }
537
538    pub fn send_exec_command(&self, command: TradingCommand) {
539        log::info!("{CMD}{SENT} {command}");
540        msgbus::send(&Ustr::from("ExecEngine.execute"), &command);
541    }
542
543    pub fn send_risk_event(&self, event: OrderEventAny) {
544        log::info!("{EVT}{SENT} {event}");
545        msgbus::send(&Ustr::from("RiskEngine.process"), &event);
546    }
547
548    pub fn send_exec_event(&self, event: OrderEventAny) {
549        log::info!("{EVT}{SENT} {event}");
550        msgbus::send(&Ustr::from("ExecEngine.process"), &event);
551    }
552}