1use std::{cell::RefCell, fmt::Debug, rc::Rc};
17
18use ahash::AHashMap;
19use nautilus_common::{
20 cache::Cache,
21 clock::Clock,
22 logging::{CMD, EVT, SEND},
23 messages::execution::{SubmitOrder, TradingCommand},
24 msgbus,
25 msgbus::MessagingSwitchboard,
26};
27use nautilus_core::UUID4;
28use nautilus_model::{
29 enums::{ContingencyType, TriggerType},
30 events::{
31 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
32 },
33 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
34 orders::{Order, OrderAny},
35 types::Quantity,
36};
37
38use super::handlers::{
39 CancelOrderHandler, CancelOrderHandlerAny, ModifyOrderHandler, ModifyOrderHandlerAny,
40 SubmitOrderHandler, SubmitOrderHandlerAny,
41};
42
43pub struct OrderManager {
50 clock: Rc<RefCell<dyn Clock>>,
51 cache: Rc<RefCell<Cache>>,
52 active_local: bool,
53 submit_order_handler: Option<SubmitOrderHandlerAny>,
54 cancel_order_handler: Option<CancelOrderHandlerAny>,
55 modify_order_handler: Option<ModifyOrderHandlerAny>,
56 submit_order_commands: AHashMap<ClientOrderId, SubmitOrder>,
57}
58
59impl Debug for OrderManager {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct(stringify!(OrderManager))
62 .field("pending_commands", &self.submit_order_commands.len())
63 .finish()
64 }
65}
66
67impl OrderManager {
68 pub fn new(
70 clock: Rc<RefCell<dyn Clock>>,
71 cache: Rc<RefCell<Cache>>,
72 active_local: bool,
73 submit_order_handler: Option<SubmitOrderHandlerAny>,
74 cancel_order_handler: Option<CancelOrderHandlerAny>,
75 modify_order_handler: Option<ModifyOrderHandlerAny>,
76 ) -> Self {
77 Self {
78 clock,
79 cache,
80 active_local,
81 submit_order_handler,
82 cancel_order_handler,
83 modify_order_handler,
84 submit_order_commands: AHashMap::new(),
85 }
86 }
87
88 pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
90 self.submit_order_handler = Some(handler);
91 }
92
93 pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
95 self.cancel_order_handler = Some(handler);
96 }
97
98 pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
100 self.modify_order_handler = Some(handler);
101 }
102
103 #[must_use]
104 pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
106 self.submit_order_commands.clone()
107 }
108
109 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
111 self.submit_order_commands
112 .insert(command.client_order_id, command);
113 }
114
115 pub fn pop_submit_order_command(
117 &mut self,
118 client_order_id: ClientOrderId,
119 ) -> Option<SubmitOrder> {
120 self.submit_order_commands.remove(&client_order_id)
121 }
122
123 pub fn reset(&mut self) {
125 self.submit_order_commands.clear();
126 }
127
128 pub fn cancel_order(&mut self, order: &OrderAny) {
130 if self
131 .cache
132 .borrow()
133 .is_order_pending_cancel_local(&order.client_order_id())
134 {
135 return;
136 }
137
138 if order.is_closed() {
139 log::warn!("Cannot cancel order: already closed");
140 return;
141 }
142
143 self.submit_order_commands.remove(&order.client_order_id());
144
145 if let Some(handler) = &self.cancel_order_handler {
146 handler.handle_cancel_order(order);
147 }
148 }
149
150 pub fn modify_order_quantity(&mut self, order: &OrderAny, new_quantity: Quantity) {
152 if let Some(handler) = &self.modify_order_handler {
153 handler.handle_modify_order(order, new_quantity);
154 }
155 }
156
157 pub fn create_new_submit_order(
161 &mut self,
162 order: &OrderAny,
163 position_id: Option<PositionId>,
164 client_id: Option<ClientId>,
165 ) -> anyhow::Result<()> {
166 self.cache
167 .borrow_mut()
168 .add_order(order.clone(), position_id, client_id, true)?;
169
170 let submit = SubmitOrder::new(
171 order.trader_id(),
172 client_id,
173 order.strategy_id(),
174 order.instrument_id(),
175 order.client_order_id(),
176 order.init_event().clone(),
177 order.exec_algorithm_id(),
178 position_id,
179 None, UUID4::new(),
181 self.clock.borrow().timestamp_ns(),
182 );
183
184 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
185 self.cache_submit_order_command(submit.clone());
186
187 match order.exec_algorithm_id() {
188 Some(exec_algorithm_id) => {
189 self.send_algo_command(submit, exec_algorithm_id);
190 }
191 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
192 }
193 } else if let Some(handler) = self.submit_order_handler.clone() {
194 self.cache_submit_order_command(submit.clone());
195 handler.handle_submit_order(submit);
196 }
197
198 Ok(())
199 }
200
201 #[must_use]
202 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
204 self.active_local && order.is_active_local()
205 }
206
207 pub fn handle_event(&mut self, event: OrderEventAny) {
213 match event {
214 OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
215 OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
216 OrderEventAny::Expired(event) => self.handle_order_expired(event),
217 OrderEventAny::Updated(event) => self.handle_order_updated(event),
218 OrderEventAny::Filled(event) => self.handle_order_filled(event),
219 _ => {}
220 }
221 }
222
223 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
225 let cloned_order = self
226 .cache
227 .borrow()
228 .order(&rejected.client_order_id)
229 .cloned();
230 if let Some(order) = cloned_order {
231 if order.contingency_type() != Some(ContingencyType::NoContingency) {
232 self.handle_contingencies(order);
233 }
234 } else {
235 log::error!(
236 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
237 rejected.client_order_id,
238 rejected
239 );
240 }
241 }
242
243 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
244 let cloned_order = self
245 .cache
246 .borrow()
247 .order(&canceled.client_order_id)
248 .cloned();
249 if let Some(order) = cloned_order {
250 if order.contingency_type() != Some(ContingencyType::NoContingency) {
251 self.handle_contingencies(order);
252 }
253 } else {
254 log::error!(
255 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
256 canceled.client_order_id,
257 canceled
258 );
259 }
260 }
261
262 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
263 let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
264 if let Some(order) = cloned_order {
265 if order.contingency_type() != Some(ContingencyType::NoContingency) {
266 self.handle_contingencies(order);
267 }
268 } else {
269 log::error!(
270 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
271 expired.client_order_id,
272 expired
273 );
274 }
275 }
276
277 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
278 let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
279 if let Some(order) = cloned_order {
280 if order.contingency_type() != Some(ContingencyType::NoContingency) {
281 self.handle_contingencies_update(order);
282 }
283 } else {
284 log::error!(
285 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
286 updated.client_order_id,
287 updated
288 );
289 }
290 }
291
292 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
296 let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
297 {
298 order
299 } else {
300 log::error!(
301 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
302 filled.client_order_id,
303 filled
304 );
305 return;
306 };
307
308 match order.contingency_type() {
309 Some(ContingencyType::Oto) => {
310 let position_id = self
311 .cache
312 .borrow()
313 .position_id(&order.client_order_id())
314 .copied();
315 let client_id = self
316 .cache
317 .borrow()
318 .client_id(&order.client_order_id())
319 .copied();
320
321 let parent_filled_qty = match order.exec_spawn_id() {
322 Some(spawn_id) => {
323 if let Some(qty) = self
324 .cache
325 .borrow()
326 .exec_spawn_total_filled_qty(&spawn_id, true)
327 {
328 qty
329 } else {
330 log::error!("Failed to get spawn filled quantity for {spawn_id}");
331 return;
332 }
333 }
334 None => order.filled_qty(),
335 };
336
337 let linked_orders = if let Some(orders) = order.linked_order_ids() {
338 orders
339 } else {
340 log::error!("No linked orders found for OTO order");
341 return;
342 };
343
344 for client_order_id in linked_orders {
345 let mut child_order =
346 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
347 order
348 } else {
349 panic!(
350 "Cannot find OTO child order for client_order_id: {client_order_id}"
351 );
352 };
353
354 if !self.should_manage_order(&child_order) {
355 continue;
356 }
357
358 if child_order.position_id().is_none() {
359 child_order.set_position_id(position_id);
360 }
361
362 if parent_filled_qty != child_order.leaves_qty() {
363 self.modify_order_quantity(&child_order, parent_filled_qty);
364 }
365
366 if !self
371 .submit_order_commands
372 .contains_key(&child_order.client_order_id())
373 && let Err(e) =
374 self.create_new_submit_order(&child_order, position_id, client_id)
375 {
376 log::error!("Failed to create new submit order: {e}");
377 }
378 }
379 }
380 Some(ContingencyType::Oco) => {
381 let linked_orders = if let Some(orders) = order.linked_order_ids() {
382 orders
383 } else {
384 log::error!("No linked orders found for OCO order");
385 return;
386 };
387
388 for client_order_id in linked_orders {
389 let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
390 {
391 Some(contingent_order) => contingent_order,
392 None => {
393 panic!(
394 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
395 );
396 }
397 };
398
399 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
401 {
402 continue;
403 }
404 if contingent_order.client_order_id() != order.client_order_id() {
405 self.cancel_order(&contingent_order);
406 }
407 }
408 }
409 Some(ContingencyType::Ouo) => self.handle_contingencies(order),
410 _ => {}
411 }
412 }
413
414 pub fn handle_contingencies(&mut self, order: OrderAny) {
418 let (filled_qty, leaves_qty, is_spawn_active) =
419 if let Some(exec_spawn_id) = order.exec_spawn_id() {
420 if let (Some(filled), Some(leaves)) = (
421 self.cache
422 .borrow()
423 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
424 self.cache
425 .borrow()
426 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
427 ) {
428 (filled, leaves, leaves.raw > 0)
429 } else {
430 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
431 return;
432 }
433 } else {
434 (order.filled_qty(), order.leaves_qty(), false)
435 };
436
437 let linked_orders = if let Some(orders) = order.linked_order_ids() {
438 orders
439 } else {
440 log::error!("No linked orders found");
441 return;
442 };
443
444 for client_order_id in linked_orders {
445 let contingent_order =
446 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
447 order
448 } else {
449 panic!("Cannot find contingent order for client_order_id: {client_order_id}");
450 };
451
452 if !self.should_manage_order(&contingent_order)
453 || client_order_id == &order.client_order_id()
454 {
455 continue;
456 }
457
458 if contingent_order.is_closed() {
459 self.submit_order_commands.remove(&order.client_order_id());
460 continue;
461 }
462
463 match order.contingency_type() {
464 Some(ContingencyType::Oto) => {
465 if order.is_closed()
466 && filled_qty.raw == 0
467 && (order.exec_spawn_id().is_none() || !is_spawn_active)
468 {
469 self.cancel_order(&contingent_order);
470 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
471 self.modify_order_quantity(&contingent_order, filled_qty);
472 }
473 }
474 Some(ContingencyType::Oco) => {
475 if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
476 self.cancel_order(&contingent_order);
477 }
478 }
479 Some(ContingencyType::Ouo) => {
480 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
481 || (order.is_closed()
482 && (order.exec_spawn_id().is_none() || !is_spawn_active))
483 {
484 self.cancel_order(&contingent_order);
485 } else if leaves_qty != contingent_order.leaves_qty() {
486 self.modify_order_quantity(&contingent_order, leaves_qty);
487 }
488 }
489 _ => {}
490 }
491 }
492 }
493
494 pub fn handle_contingencies_update(&mut self, order: OrderAny) {
498 let quantity = match order.exec_spawn_id() {
499 Some(exec_spawn_id) => {
500 if let Some(qty) = self
501 .cache
502 .borrow()
503 .exec_spawn_total_quantity(&exec_spawn_id, true)
504 {
505 qty
506 } else {
507 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
508 return;
509 }
510 }
511 None => order.quantity(),
512 };
513
514 if quantity.raw == 0 {
515 return;
516 }
517
518 let linked_orders = if let Some(orders) = order.linked_order_ids() {
519 orders
520 } else {
521 log::error!("No linked orders found for contingent order");
522 return;
523 };
524
525 for client_order_id in linked_orders {
526 let contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
527 Some(contingent_order) => contingent_order,
528 None => panic!(
529 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
530 ),
531 };
532
533 if !self.should_manage_order(&contingent_order)
534 || client_order_id == &order.client_order_id()
535 || contingent_order.is_closed()
536 {
537 continue;
538 }
539
540 if let Some(contingency_type) = order.contingency_type()
541 && matches!(
542 contingency_type,
543 ContingencyType::Oto | ContingencyType::Oco
544 )
545 && quantity != contingent_order.quantity()
546 {
547 self.modify_order_quantity(&contingent_order, quantity);
548 }
549 }
550 }
551
552 pub fn send_emulator_command(&self, command: TradingCommand) {
554 log_cmd_send(&command);
555 let endpoint = MessagingSwitchboard::order_emulator_execute();
556 msgbus::send_trading_command(endpoint, command);
557 }
558
559 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
560 let id = command.strategy_id;
561 log::info!("{id} {CMD}{SEND} {command}");
562
563 let endpoint = format!("{exec_algorithm_id}.execute");
565 msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
566 }
567
568 pub fn send_risk_command(&self, command: TradingCommand) {
569 log_cmd_send(&command);
570 let endpoint = MessagingSwitchboard::risk_engine_execute();
571 msgbus::send_trading_command(endpoint, command);
572 }
573
574 pub fn send_exec_command(&self, command: TradingCommand) {
575 log_cmd_send(&command);
576
577 let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
580 msgbus::send_trading_command(endpoint, command);
581 }
582
583 pub fn send_risk_event(&self, event: OrderEventAny) {
584 log_evt_send(&event);
585 let endpoint = MessagingSwitchboard::risk_engine_process();
586 msgbus::send_order_event(endpoint, event);
587 }
588
589 pub fn send_exec_event(&self, event: OrderEventAny) {
590 log_evt_send(&event);
591 let endpoint = MessagingSwitchboard::exec_engine_process();
592 msgbus::send_order_event(endpoint, event);
593 }
594}
595
596#[inline(always)]
597fn log_cmd_send(command: &TradingCommand) {
598 if let Some(id) = command.strategy_id() {
599 log::info!("{id} {CMD}{SEND} {command}");
600 } else {
601 log::info!("{CMD}{SEND} {command}");
602 }
603}
604
605#[inline(always)]
606fn log_evt_send(event: &OrderEventAny) {
607 let id = event.strategy_id();
608 log::info!("{id} {EVT}{SEND} {event}");
609}
610
611#[cfg(test)]
612mod tests {
613 use std::{cell::RefCell, rc::Rc};
614
615 use nautilus_common::{cache::Cache, clock::TestClock};
616 use nautilus_core::{UUID4, WeakCell};
617 use nautilus_model::{
618 enums::{OrderSide, OrderType, TriggerType},
619 events::{OrderAccepted, OrderSubmitted},
620 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
621 instruments::{Instrument, stubs::audusd_sim},
622 orders::OrderTestBuilder,
623 types::{Price, Quantity},
624 };
625 use rstest::rstest;
626
627 use super::*;
628 use crate::{
629 order_emulator::emulator::OrderEmulator,
630 order_manager::handlers::{
631 CancelOrderHandlerAny, ModifyOrderHandlerAny, SubmitOrderHandlerAny,
632 },
633 };
634
635 #[rstest]
638 fn test_handle_event_unhandled_events_are_noop() {
639 let submitted = OrderEventAny::Submitted(OrderSubmitted {
640 trader_id: TraderId::from("TRADER-001"),
641 strategy_id: StrategyId::from("STRATEGY-001"),
642 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
643 client_order_id: ClientOrderId::from("O-001"),
644 account_id: AccountId::from("ACCOUNT-001"),
645 event_id: UUID4::new(),
646 ts_event: Default::default(),
647 ts_init: Default::default(),
648 });
649 let accepted = OrderEventAny::Accepted(OrderAccepted {
650 trader_id: TraderId::from("TRADER-001"),
651 strategy_id: StrategyId::from("STRATEGY-001"),
652 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
653 client_order_id: ClientOrderId::from("O-001"),
654 venue_order_id: VenueOrderId::from("V-001"),
655 account_id: AccountId::from("ACCOUNT-001"),
656 event_id: UUID4::new(),
657 ts_event: Default::default(),
658 ts_init: Default::default(),
659 reconciliation: 0,
660 });
661
662 match submitted {
663 OrderEventAny::Rejected(_) => panic!("Should not match"),
664 OrderEventAny::Canceled(_) => panic!("Should not match"),
665 OrderEventAny::Expired(_) => panic!("Should not match"),
666 OrderEventAny::Updated(_) => panic!("Should not match"),
667 OrderEventAny::Filled(_) => panic!("Should not match"),
668 _ => {}
669 }
670 match accepted {
671 OrderEventAny::Rejected(_) => panic!("Should not match"),
672 OrderEventAny::Canceled(_) => panic!("Should not match"),
673 OrderEventAny::Expired(_) => panic!("Should not match"),
674 OrderEventAny::Updated(_) => panic!("Should not match"),
675 OrderEventAny::Filled(_) => panic!("Should not match"),
676 _ => {}
677 }
678 }
679
680 #[allow(clippy::type_complexity)]
681 fn create_test_components() -> (
682 Rc<RefCell<dyn Clock>>,
683 Rc<RefCell<Cache>>,
684 Rc<RefCell<OrderEmulator>>,
685 ) {
686 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
687 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
688 let emulator = Rc::new(RefCell::new(OrderEmulator::new(
689 clock.clone(),
690 cache.clone(),
691 )));
692 (clock, cache, emulator)
693 }
694
695 fn create_test_stop_order() -> OrderAny {
696 let instrument = audusd_sim();
697 OrderTestBuilder::new(OrderType::StopMarket)
698 .instrument_id(instrument.id())
699 .side(OrderSide::Buy)
700 .trigger_price(Price::from("1.00050"))
701 .quantity(Quantity::from(100_000))
702 .emulation_trigger(TriggerType::BidAsk)
703 .build()
704 }
705
706 #[rstest]
707 fn test_order_manager_with_handlers() {
708 let (clock, cache, emulator) = create_test_components();
709 let submit_handler =
710 SubmitOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
711 let cancel_handler =
712 CancelOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
713 let modify_handler =
714 ModifyOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
715
716 let manager = OrderManager::new(
717 clock,
718 cache,
719 true,
720 Some(submit_handler),
721 Some(cancel_handler),
722 Some(modify_handler),
723 );
724
725 assert!(manager.submit_order_handler.is_some());
726 assert!(manager.cancel_order_handler.is_some());
727 assert!(manager.modify_order_handler.is_some());
728 }
729
730 #[rstest]
731 fn test_order_manager_cancel_order_dispatches_to_handler() {
732 let (clock, cache, emulator) = create_test_components();
733 let cancel_handler =
734 CancelOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
735 let mut manager =
736 OrderManager::new(clock, cache.clone(), true, None, Some(cancel_handler), None);
737 let order = create_test_stop_order();
738 cache
739 .borrow_mut()
740 .add_order(order.clone(), None, None, false)
741 .unwrap();
742
743 manager.cancel_order(&order);
744 }
745
746 #[rstest]
747 fn test_order_manager_modify_order_dispatches_to_handler() {
748 let (clock, cache, emulator) = create_test_components();
749 let modify_handler =
750 ModifyOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
751 let mut manager = OrderManager::new(clock, cache, true, None, None, Some(modify_handler));
752 let order = create_test_stop_order();
753 let new_quantity = Quantity::from(50_000);
754
755 manager.modify_order_quantity(&order, new_quantity);
756 }
757
758 #[rstest]
759 fn test_order_manager_without_handlers() {
760 let (clock, cache, _emulator) = create_test_components();
761 let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
762 let order = create_test_stop_order();
763 cache
764 .borrow_mut()
765 .add_order(order.clone(), None, None, false)
766 .unwrap();
767
768 manager.cancel_order(&order);
769 manager.modify_order_quantity(&order, Quantity::from(50_000));
770 }
771}