1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
21
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 logging::{CMD, EVT, SEND},
26 messages::execution::{SubmitOrder, TradingCommand},
27 msgbus,
28};
29use nautilus_core::UUID4;
30use nautilus_model::{
31 enums::{ContingencyType, TriggerType},
32 events::{
33 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
34 },
35 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
36 orders::{Order, OrderAny},
37 types::Quantity,
38};
39
40pub struct OrderManager {
47 clock: Rc<RefCell<dyn Clock>>,
48 cache: Rc<RefCell<Cache>>,
49 active_local: bool,
50 submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
54}
55
56impl Debug for OrderManager {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct(stringify!(OrderManager))
59 .field("pending_commands", &self.submit_order_commands.len())
60 .finish()
61 }
62}
63
64impl OrderManager {
65 pub fn new(
67 clock: Rc<RefCell<dyn Clock>>,
68 cache: Rc<RefCell<Cache>>,
69 active_local: bool,
70 ) -> Self {
74 Self {
75 clock,
76 cache,
77 active_local,
78 submit_order_commands: HashMap::new(),
82 }
83 }
84
85 #[must_use]
98 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
100 self.submit_order_commands.clone()
101 }
102
103 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
105 self.submit_order_commands
106 .insert(command.order.client_order_id(), command);
107 }
108
109 pub fn pop_submit_order_command(
111 &mut self,
112 client_order_id: ClientOrderId,
113 ) -> Option<SubmitOrder> {
114 self.submit_order_commands.remove(&client_order_id)
115 }
116
117 pub fn reset(&mut self) {
119 self.submit_order_commands.clear();
120 }
121
122 pub fn cancel_order(&mut self, order: &OrderAny) {
124 if self
125 .cache
126 .borrow()
127 .is_order_pending_cancel_local(&order.client_order_id())
128 {
129 return;
130 }
131
132 if order.is_closed() {
133 log::warn!("Cannot cancel order: already closed");
134 return;
135 }
136
137 self.submit_order_commands.remove(&order.client_order_id());
138
139 }
143
144 pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
146 }
150
151 pub fn create_new_submit_order(
155 &mut self,
156 order: &OrderAny,
157 position_id: Option<PositionId>,
158 client_id: Option<ClientId>,
159 ) -> anyhow::Result<()> {
160 let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
161 let venue_order_id = order
162 .venue_order_id()
163 .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
164
165 let submit = SubmitOrder::new(
166 order.trader_id(),
167 client_id,
168 order.strategy_id(),
169 order.instrument_id(),
170 order.client_order_id(),
171 venue_order_id,
172 order.clone(),
173 order.exec_algorithm_id(),
174 position_id,
175 None, UUID4::new(),
177 self.clock.borrow().timestamp_ns(),
178 )?;
179
180 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
181 self.cache_submit_order_command(submit.clone());
182
183 match order.exec_algorithm_id() {
184 Some(exec_algorithm_id) => {
185 self.send_algo_command(submit, exec_algorithm_id);
186 }
187 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
188 }
189 } Ok(())
194 }
195
196 #[must_use]
197 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
199 self.active_local && order.is_active_local()
200 }
201
202 pub fn handle_event(&mut self, event: OrderEventAny) {
208 match event {
209 OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
210 OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
211 OrderEventAny::Expired(event) => self.handle_order_expired(event),
212 OrderEventAny::Updated(event) => self.handle_order_updated(event),
213 OrderEventAny::Filled(event) => self.handle_order_filled(event),
214 _ => {}
215 }
216 }
217
218 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
220 let cloned_order = self
221 .cache
222 .borrow()
223 .order(&rejected.client_order_id)
224 .cloned();
225 if let Some(order) = cloned_order {
226 if order.contingency_type() != Some(ContingencyType::NoContingency) {
227 self.handle_contingencies(order);
228 }
229 } else {
230 log::error!(
231 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
232 rejected.client_order_id,
233 rejected
234 );
235 }
236 }
237
238 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
239 let cloned_order = self
240 .cache
241 .borrow()
242 .order(&canceled.client_order_id)
243 .cloned();
244 if let Some(order) = cloned_order {
245 if order.contingency_type() != Some(ContingencyType::NoContingency) {
246 self.handle_contingencies(order);
247 }
248 } else {
249 log::error!(
250 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
251 canceled.client_order_id,
252 canceled
253 );
254 }
255 }
256
257 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
258 let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
259 if let Some(order) = cloned_order {
260 if order.contingency_type() != Some(ContingencyType::NoContingency) {
261 self.handle_contingencies(order);
262 }
263 } else {
264 log::error!(
265 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
266 expired.client_order_id,
267 expired
268 );
269 }
270 }
271
272 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
273 let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
274 if let Some(order) = cloned_order {
275 if order.contingency_type() != Some(ContingencyType::NoContingency) {
276 self.handle_contingencies_update(order);
277 }
278 } else {
279 log::error!(
280 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
281 updated.client_order_id,
282 updated
283 );
284 }
285 }
286
287 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
291 let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
292 {
293 order
294 } else {
295 log::error!(
296 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
297 filled.client_order_id,
298 filled
299 );
300 return;
301 };
302
303 match order.contingency_type() {
304 Some(ContingencyType::Oto) => {
305 let position_id = self
306 .cache
307 .borrow()
308 .position_id(&order.client_order_id())
309 .copied();
310 let client_id = self
311 .cache
312 .borrow()
313 .client_id(&order.client_order_id())
314 .copied();
315
316 let parent_filled_qty = match order.exec_spawn_id() {
317 Some(spawn_id) => {
318 if let Some(qty) = self
319 .cache
320 .borrow()
321 .exec_spawn_total_filled_qty(&spawn_id, true)
322 {
323 qty
324 } else {
325 log::error!("Failed to get spawn filled quantity for {spawn_id}");
326 return;
327 }
328 }
329 None => order.filled_qty(),
330 };
331
332 let linked_orders = if let Some(orders) = order.linked_order_ids() {
333 orders
334 } else {
335 log::error!("No linked orders found for OTO order");
336 return;
337 };
338
339 for client_order_id in linked_orders {
340 let mut child_order =
341 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
342 order
343 } else {
344 panic!(
345 "Cannot find OTO child order for client_order_id: {client_order_id}"
346 );
347 };
348
349 if !self.should_manage_order(&child_order) {
350 continue;
351 }
352
353 if child_order.position_id().is_none() {
354 child_order.set_position_id(position_id);
355 }
356
357 if parent_filled_qty != child_order.leaves_qty() {
358 self.modify_order_quantity(&mut child_order, parent_filled_qty);
359 }
360
361 if !self
366 .submit_order_commands
367 .contains_key(&child_order.client_order_id())
368 && let Err(e) =
369 self.create_new_submit_order(&child_order, position_id, client_id)
370 {
371 log::error!("Failed to create new submit order: {e}");
372 }
373 }
374 }
375 Some(ContingencyType::Oco) => {
376 let linked_orders = if let Some(orders) = order.linked_order_ids() {
377 orders
378 } else {
379 log::error!("No linked orders found for OCO order");
380 return;
381 };
382
383 for client_order_id in linked_orders {
384 let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
385 {
386 Some(contingent_order) => contingent_order,
387 None => {
388 panic!(
389 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
390 );
391 }
392 };
393
394 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
396 {
397 continue;
398 }
399 if contingent_order.client_order_id() != order.client_order_id() {
400 self.cancel_order(&contingent_order);
401 }
402 }
403 }
404 Some(ContingencyType::Ouo) => self.handle_contingencies(order),
405 _ => {}
406 }
407 }
408
409 pub fn handle_contingencies(&mut self, order: OrderAny) {
413 let (filled_qty, leaves_qty, is_spawn_active) =
414 if let Some(exec_spawn_id) = order.exec_spawn_id() {
415 if let (Some(filled), Some(leaves)) = (
416 self.cache
417 .borrow()
418 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
419 self.cache
420 .borrow()
421 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
422 ) {
423 (filled, leaves, leaves.raw > 0)
424 } else {
425 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
426 return;
427 }
428 } else {
429 (order.filled_qty(), order.leaves_qty(), false)
430 };
431
432 let linked_orders = if let Some(orders) = order.linked_order_ids() {
433 orders
434 } else {
435 log::error!("No linked orders found");
436 return;
437 };
438
439 for client_order_id in linked_orders {
440 let mut contingent_order =
441 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
442 order
443 } else {
444 panic!("Cannot find contingent order for client_order_id: {client_order_id}");
445 };
446
447 if !self.should_manage_order(&contingent_order)
448 || client_order_id == &order.client_order_id()
449 {
450 continue;
451 }
452
453 if contingent_order.is_closed() {
454 self.submit_order_commands.remove(&order.client_order_id());
455 continue;
456 }
457
458 match order.contingency_type() {
459 Some(ContingencyType::Oto) => {
460 if order.is_closed()
461 && filled_qty.raw == 0
462 && (order.exec_spawn_id().is_none() || !is_spawn_active)
463 {
464 self.cancel_order(&contingent_order);
465 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
466 self.modify_order_quantity(&mut contingent_order, filled_qty);
467 }
468 }
469 Some(ContingencyType::Oco) => {
470 if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
471 self.cancel_order(&contingent_order);
472 }
473 }
474 Some(ContingencyType::Ouo) => {
475 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
476 || (order.is_closed()
477 && (order.exec_spawn_id().is_none() || !is_spawn_active))
478 {
479 self.cancel_order(&contingent_order);
480 } else if leaves_qty != contingent_order.leaves_qty() {
481 self.modify_order_quantity(&mut contingent_order, leaves_qty);
482 }
483 }
484 _ => {}
485 }
486 }
487 }
488
489 pub fn handle_contingencies_update(&mut self, order: OrderAny) {
493 let quantity = match order.exec_spawn_id() {
494 Some(exec_spawn_id) => {
495 if let Some(qty) = self
496 .cache
497 .borrow()
498 .exec_spawn_total_quantity(&exec_spawn_id, true)
499 {
500 qty
501 } else {
502 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
503 return;
504 }
505 }
506 None => order.quantity(),
507 };
508
509 if quantity.raw == 0 {
510 return;
511 }
512
513 let linked_orders = if let Some(orders) = order.linked_order_ids() {
514 orders
515 } else {
516 log::error!("No linked orders found for contingent order");
517 return;
518 };
519
520 for client_order_id in linked_orders {
521 let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
522 Some(contingent_order) => contingent_order,
523 None => panic!(
524 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
525 ),
526 };
527
528 if !self.should_manage_order(&contingent_order)
529 || client_order_id == &order.client_order_id()
530 || contingent_order.is_closed()
531 {
532 continue;
533 }
534
535 if let Some(contingency_type) = order.contingency_type()
536 && matches!(
537 contingency_type,
538 ContingencyType::Oto | ContingencyType::Oco
539 )
540 && quantity != contingent_order.quantity()
541 {
542 self.modify_order_quantity(&mut contingent_order, quantity);
543 }
544 }
545 }
546
547 pub fn send_emulator_command(&self, command: TradingCommand) {
549 log_cmd_send(&command);
550 msgbus::send_any("OrderEmulator.execute".into(), &command);
551 }
552
553 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
554 let id = command.strategy_id;
555 log::info!("{id} {CMD}{SEND} {command}");
556
557 let endpoint = format!("{exec_algorithm_id}.execute");
558 msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
559 }
560
561 pub fn send_risk_command(&self, command: TradingCommand) {
562 log_cmd_send(&command);
563 msgbus::send_any("RiskEngine.execute".into(), &command);
564 }
565
566 pub fn send_exec_command(&self, command: TradingCommand) {
567 log_cmd_send(&command);
568 msgbus::send_any("ExecEngine.execute".into(), &command);
569 }
570
571 pub fn send_risk_event(&self, event: OrderEventAny) {
572 log_evt_send(&event);
573 msgbus::send_any("RiskEngine.process".into(), &event);
574 }
575
576 pub fn send_exec_event(&self, event: OrderEventAny) {
577 log_evt_send(&event);
578 msgbus::send_any("ExecEngine.process".into(), &event);
579 }
580}
581
582#[inline(always)]
583fn log_cmd_send(command: &TradingCommand) {
584 if let Some(id) = command.strategy_id() {
585 log::info!("{id} {CMD}{SEND} {command}");
586 } else {
587 log::info!("{CMD}{SEND} {command}");
588 }
589}
590
591#[inline(always)]
592fn log_evt_send(event: &OrderEventAny) {
593 let id = event.strategy_id();
594 log::info!("{id} {EVT}{SEND} {event}");
595}
596
597#[cfg(test)]
598mod tests {
599 use nautilus_core::UUID4;
600 use nautilus_model::{
601 events::{OrderAccepted, OrderSubmitted},
602 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
603 };
604 use rstest::rstest;
605
606 use super::*;
607
608 #[rstest]
611 fn test_handle_event_unhandled_events_are_noop() {
612 let submitted = OrderEventAny::Submitted(OrderSubmitted {
613 trader_id: TraderId::from("TRADER-001"),
614 strategy_id: StrategyId::from("STRATEGY-001"),
615 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
616 client_order_id: ClientOrderId::from("O-001"),
617 account_id: AccountId::from("ACCOUNT-001"),
618 event_id: UUID4::new(),
619 ts_event: Default::default(),
620 ts_init: Default::default(),
621 });
622 let accepted = OrderEventAny::Accepted(OrderAccepted {
623 trader_id: TraderId::from("TRADER-001"),
624 strategy_id: StrategyId::from("STRATEGY-001"),
625 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
626 client_order_id: ClientOrderId::from("O-001"),
627 venue_order_id: VenueOrderId::from("V-001"),
628 account_id: AccountId::from("ACCOUNT-001"),
629 event_id: UUID4::new(),
630 ts_event: Default::default(),
631 ts_init: Default::default(),
632 reconciliation: 0,
633 });
634
635 match submitted {
636 OrderEventAny::Rejected(_) => panic!("Should not match"),
637 OrderEventAny::Canceled(_) => panic!("Should not match"),
638 OrderEventAny::Expired(_) => panic!("Should not match"),
639 OrderEventAny::Updated(_) => panic!("Should not match"),
640 OrderEventAny::Filled(_) => panic!("Should not match"),
641 _ => {}
642 }
643 match accepted {
644 OrderEventAny::Rejected(_) => panic!("Should not match"),
645 OrderEventAny::Canceled(_) => panic!("Should not match"),
646 OrderEventAny::Expired(_) => panic!("Should not match"),
647 OrderEventAny::Updated(_) => panic!("Should not match"),
648 OrderEventAny::Filled(_) => panic!("Should not match"),
649 _ => {}
650 }
651 }
652}