1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, fmt::Debug, rc::Rc};
21
22use ahash::AHashMap;
23use nautilus_common::{
24 cache::Cache,
25 clock::Clock,
26 logging::{CMD, EVT, SEND},
27 messages::execution::{SubmitOrder, TradingCommand},
28 msgbus,
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32 enums::{ContingencyType, TriggerType},
33 events::{
34 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
35 },
36 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
37 orders::{Order, OrderAny},
38 types::Quantity,
39};
40
41pub struct OrderManager {
48 clock: Rc<RefCell<dyn Clock>>,
49 cache: Rc<RefCell<Cache>>,
50 active_local: bool,
51 submit_order_commands: AHashMap<ClientOrderId, SubmitOrder>,
55}
56
57impl Debug for OrderManager {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct(stringify!(OrderManager))
60 .field("pending_commands", &self.submit_order_commands.len())
61 .finish()
62 }
63}
64
65impl OrderManager {
66 pub fn new(
68 clock: Rc<RefCell<dyn Clock>>,
69 cache: Rc<RefCell<Cache>>,
70 active_local: bool,
71 ) -> Self {
75 Self {
76 clock,
77 cache,
78 active_local,
79 submit_order_commands: AHashMap::new(),
83 }
84 }
85
86 #[must_use]
99 pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
101 self.submit_order_commands.clone()
102 }
103
104 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
106 self.submit_order_commands
107 .insert(command.order.client_order_id(), command);
108 }
109
110 pub fn pop_submit_order_command(
112 &mut self,
113 client_order_id: ClientOrderId,
114 ) -> Option<SubmitOrder> {
115 self.submit_order_commands.remove(&client_order_id)
116 }
117
118 pub fn reset(&mut self) {
120 self.submit_order_commands.clear();
121 }
122
123 pub fn cancel_order(&mut self, order: &OrderAny) {
125 if self
126 .cache
127 .borrow()
128 .is_order_pending_cancel_local(&order.client_order_id())
129 {
130 return;
131 }
132
133 if order.is_closed() {
134 log::warn!("Cannot cancel order: already closed");
135 return;
136 }
137
138 self.submit_order_commands.remove(&order.client_order_id());
139
140 }
144
145 pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
147 }
151
152 pub fn create_new_submit_order(
156 &mut self,
157 order: &OrderAny,
158 position_id: Option<PositionId>,
159 client_id: Option<ClientId>,
160 ) -> anyhow::Result<()> {
161 let submit = SubmitOrder::new(
162 order.trader_id(),
163 client_id,
164 order.strategy_id(),
165 order.instrument_id(),
166 order.clone(),
167 order.exec_algorithm_id(),
168 position_id,
169 None, UUID4::new(),
171 self.clock.borrow().timestamp_ns(),
172 );
173
174 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
175 self.cache_submit_order_command(submit.clone());
176
177 match order.exec_algorithm_id() {
178 Some(exec_algorithm_id) => {
179 self.send_algo_command(submit, exec_algorithm_id);
180 }
181 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
182 }
183 } Ok(())
188 }
189
190 #[must_use]
191 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
193 self.active_local && order.is_active_local()
194 }
195
196 pub fn handle_event(&mut self, event: OrderEventAny) {
202 match event {
203 OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
204 OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
205 OrderEventAny::Expired(event) => self.handle_order_expired(event),
206 OrderEventAny::Updated(event) => self.handle_order_updated(event),
207 OrderEventAny::Filled(event) => self.handle_order_filled(event),
208 _ => {}
209 }
210 }
211
212 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
214 let cloned_order = self
215 .cache
216 .borrow()
217 .order(&rejected.client_order_id)
218 .cloned();
219 if let Some(order) = cloned_order {
220 if order.contingency_type() != Some(ContingencyType::NoContingency) {
221 self.handle_contingencies(order);
222 }
223 } else {
224 log::error!(
225 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
226 rejected.client_order_id,
227 rejected
228 );
229 }
230 }
231
232 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
233 let cloned_order = self
234 .cache
235 .borrow()
236 .order(&canceled.client_order_id)
237 .cloned();
238 if let Some(order) = cloned_order {
239 if order.contingency_type() != Some(ContingencyType::NoContingency) {
240 self.handle_contingencies(order);
241 }
242 } else {
243 log::error!(
244 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
245 canceled.client_order_id,
246 canceled
247 );
248 }
249 }
250
251 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
252 let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
253 if let Some(order) = cloned_order {
254 if order.contingency_type() != Some(ContingencyType::NoContingency) {
255 self.handle_contingencies(order);
256 }
257 } else {
258 log::error!(
259 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
260 expired.client_order_id,
261 expired
262 );
263 }
264 }
265
266 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
267 let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
268 if let Some(order) = cloned_order {
269 if order.contingency_type() != Some(ContingencyType::NoContingency) {
270 self.handle_contingencies_update(order);
271 }
272 } else {
273 log::error!(
274 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
275 updated.client_order_id,
276 updated
277 );
278 }
279 }
280
281 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
285 let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
286 {
287 order
288 } else {
289 log::error!(
290 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
291 filled.client_order_id,
292 filled
293 );
294 return;
295 };
296
297 match order.contingency_type() {
298 Some(ContingencyType::Oto) => {
299 let position_id = self
300 .cache
301 .borrow()
302 .position_id(&order.client_order_id())
303 .copied();
304 let client_id = self
305 .cache
306 .borrow()
307 .client_id(&order.client_order_id())
308 .copied();
309
310 let parent_filled_qty = match order.exec_spawn_id() {
311 Some(spawn_id) => {
312 if let Some(qty) = self
313 .cache
314 .borrow()
315 .exec_spawn_total_filled_qty(&spawn_id, true)
316 {
317 qty
318 } else {
319 log::error!("Failed to get spawn filled quantity for {spawn_id}");
320 return;
321 }
322 }
323 None => order.filled_qty(),
324 };
325
326 let linked_orders = if let Some(orders) = order.linked_order_ids() {
327 orders
328 } else {
329 log::error!("No linked orders found for OTO order");
330 return;
331 };
332
333 for client_order_id in linked_orders {
334 let mut child_order =
335 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
336 order
337 } else {
338 panic!(
339 "Cannot find OTO child order for client_order_id: {client_order_id}"
340 );
341 };
342
343 if !self.should_manage_order(&child_order) {
344 continue;
345 }
346
347 if child_order.position_id().is_none() {
348 child_order.set_position_id(position_id);
349 }
350
351 if parent_filled_qty != child_order.leaves_qty() {
352 self.modify_order_quantity(&mut child_order, parent_filled_qty);
353 }
354
355 if !self
360 .submit_order_commands
361 .contains_key(&child_order.client_order_id())
362 && let Err(e) =
363 self.create_new_submit_order(&child_order, position_id, client_id)
364 {
365 log::error!("Failed to create new submit order: {e}");
366 }
367 }
368 }
369 Some(ContingencyType::Oco) => {
370 let linked_orders = if let Some(orders) = order.linked_order_ids() {
371 orders
372 } else {
373 log::error!("No linked orders found for OCO order");
374 return;
375 };
376
377 for client_order_id in linked_orders {
378 let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
379 {
380 Some(contingent_order) => contingent_order,
381 None => {
382 panic!(
383 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
384 );
385 }
386 };
387
388 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
390 {
391 continue;
392 }
393 if contingent_order.client_order_id() != order.client_order_id() {
394 self.cancel_order(&contingent_order);
395 }
396 }
397 }
398 Some(ContingencyType::Ouo) => self.handle_contingencies(order),
399 _ => {}
400 }
401 }
402
403 pub fn handle_contingencies(&mut self, order: OrderAny) {
407 let (filled_qty, leaves_qty, is_spawn_active) =
408 if let Some(exec_spawn_id) = order.exec_spawn_id() {
409 if let (Some(filled), Some(leaves)) = (
410 self.cache
411 .borrow()
412 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
413 self.cache
414 .borrow()
415 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
416 ) {
417 (filled, leaves, leaves.raw > 0)
418 } else {
419 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
420 return;
421 }
422 } else {
423 (order.filled_qty(), order.leaves_qty(), false)
424 };
425
426 let linked_orders = if let Some(orders) = order.linked_order_ids() {
427 orders
428 } else {
429 log::error!("No linked orders found");
430 return;
431 };
432
433 for client_order_id in linked_orders {
434 let mut contingent_order =
435 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
436 order
437 } else {
438 panic!("Cannot find contingent order for client_order_id: {client_order_id}");
439 };
440
441 if !self.should_manage_order(&contingent_order)
442 || client_order_id == &order.client_order_id()
443 {
444 continue;
445 }
446
447 if contingent_order.is_closed() {
448 self.submit_order_commands.remove(&order.client_order_id());
449 continue;
450 }
451
452 match order.contingency_type() {
453 Some(ContingencyType::Oto) => {
454 if order.is_closed()
455 && filled_qty.raw == 0
456 && (order.exec_spawn_id().is_none() || !is_spawn_active)
457 {
458 self.cancel_order(&contingent_order);
459 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
460 self.modify_order_quantity(&mut contingent_order, filled_qty);
461 }
462 }
463 Some(ContingencyType::Oco) => {
464 if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
465 self.cancel_order(&contingent_order);
466 }
467 }
468 Some(ContingencyType::Ouo) => {
469 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
470 || (order.is_closed()
471 && (order.exec_spawn_id().is_none() || !is_spawn_active))
472 {
473 self.cancel_order(&contingent_order);
474 } else if leaves_qty != contingent_order.leaves_qty() {
475 self.modify_order_quantity(&mut contingent_order, leaves_qty);
476 }
477 }
478 _ => {}
479 }
480 }
481 }
482
483 pub fn handle_contingencies_update(&mut self, order: OrderAny) {
487 let quantity = match order.exec_spawn_id() {
488 Some(exec_spawn_id) => {
489 if let Some(qty) = self
490 .cache
491 .borrow()
492 .exec_spawn_total_quantity(&exec_spawn_id, true)
493 {
494 qty
495 } else {
496 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
497 return;
498 }
499 }
500 None => order.quantity(),
501 };
502
503 if quantity.raw == 0 {
504 return;
505 }
506
507 let linked_orders = if let Some(orders) = order.linked_order_ids() {
508 orders
509 } else {
510 log::error!("No linked orders found for contingent order");
511 return;
512 };
513
514 for client_order_id in linked_orders {
515 let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
516 Some(contingent_order) => contingent_order,
517 None => panic!(
518 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
519 ),
520 };
521
522 if !self.should_manage_order(&contingent_order)
523 || client_order_id == &order.client_order_id()
524 || contingent_order.is_closed()
525 {
526 continue;
527 }
528
529 if let Some(contingency_type) = order.contingency_type()
530 && matches!(
531 contingency_type,
532 ContingencyType::Oto | ContingencyType::Oco
533 )
534 && quantity != contingent_order.quantity()
535 {
536 self.modify_order_quantity(&mut contingent_order, quantity);
537 }
538 }
539 }
540
541 pub fn send_emulator_command(&self, command: TradingCommand) {
543 log_cmd_send(&command);
544 msgbus::send_any("OrderEmulator.execute".into(), &command);
545 }
546
547 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
548 let id = command.strategy_id;
549 log::info!("{id} {CMD}{SEND} {command}");
550
551 let endpoint = format!("{exec_algorithm_id}.execute");
552 msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
553 }
554
555 pub fn send_risk_command(&self, command: TradingCommand) {
556 log_cmd_send(&command);
557 msgbus::send_any("RiskEngine.execute".into(), &command);
558 }
559
560 pub fn send_exec_command(&self, command: TradingCommand) {
561 log_cmd_send(&command);
562 msgbus::send_any("ExecEngine.execute".into(), &command);
563 }
564
565 pub fn send_risk_event(&self, event: OrderEventAny) {
566 log_evt_send(&event);
567 msgbus::send_any("RiskEngine.process".into(), &event);
568 }
569
570 pub fn send_exec_event(&self, event: OrderEventAny) {
571 log_evt_send(&event);
572 msgbus::send_any("ExecEngine.process".into(), &event);
573 }
574}
575
576#[inline(always)]
577fn log_cmd_send(command: &TradingCommand) {
578 if let Some(id) = command.strategy_id() {
579 log::info!("{id} {CMD}{SEND} {command}");
580 } else {
581 log::info!("{CMD}{SEND} {command}");
582 }
583}
584
585#[inline(always)]
586fn log_evt_send(event: &OrderEventAny) {
587 let id = event.strategy_id();
588 log::info!("{id} {EVT}{SEND} {event}");
589}
590
591#[cfg(test)]
592mod tests {
593 use nautilus_core::UUID4;
594 use nautilus_model::{
595 events::{OrderAccepted, OrderSubmitted},
596 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
597 };
598 use rstest::rstest;
599
600 use super::*;
601
602 #[rstest]
605 fn test_handle_event_unhandled_events_are_noop() {
606 let submitted = OrderEventAny::Submitted(OrderSubmitted {
607 trader_id: TraderId::from("TRADER-001"),
608 strategy_id: StrategyId::from("STRATEGY-001"),
609 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
610 client_order_id: ClientOrderId::from("O-001"),
611 account_id: AccountId::from("ACCOUNT-001"),
612 event_id: UUID4::new(),
613 ts_event: Default::default(),
614 ts_init: Default::default(),
615 });
616 let accepted = OrderEventAny::Accepted(OrderAccepted {
617 trader_id: TraderId::from("TRADER-001"),
618 strategy_id: StrategyId::from("STRATEGY-001"),
619 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
620 client_order_id: ClientOrderId::from("O-001"),
621 venue_order_id: VenueOrderId::from("V-001"),
622 account_id: AccountId::from("ACCOUNT-001"),
623 event_id: UUID4::new(),
624 ts_event: Default::default(),
625 ts_init: Default::default(),
626 reconciliation: 0,
627 });
628
629 match submitted {
630 OrderEventAny::Rejected(_) => panic!("Should not match"),
631 OrderEventAny::Canceled(_) => panic!("Should not match"),
632 OrderEventAny::Expired(_) => panic!("Should not match"),
633 OrderEventAny::Updated(_) => panic!("Should not match"),
634 OrderEventAny::Filled(_) => panic!("Should not match"),
635 _ => {}
636 }
637 match accepted {
638 OrderEventAny::Rejected(_) => panic!("Should not match"),
639 OrderEventAny::Canceled(_) => panic!("Should not match"),
640 OrderEventAny::Expired(_) => panic!("Should not match"),
641 OrderEventAny::Updated(_) => panic!("Should not match"),
642 OrderEventAny::Filled(_) => panic!("Should not match"),
643 _ => {}
644 }
645 }
646}