nautilus_execution/order_manager/
manager.rs1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
21
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 logging::{CMD, EVT, SEND},
26 messages::execution::{SubmitOrder, TradingCommand},
27 msgbus,
28};
29use nautilus_core::UUID4;
30use nautilus_model::{
31 enums::{ContingencyType, TriggerType},
32 events::{
33 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
34 },
35 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
36 orders::{Order, OrderAny},
37 types::Quantity,
38};
39
40pub struct OrderManager {
47 clock: Rc<RefCell<dyn Clock>>,
48 cache: Rc<RefCell<Cache>>,
49 active_local: bool,
50 submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
54}
55
56impl Debug for OrderManager {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct(stringify!(OrderManager))
59 .field("pending_commands", &self.submit_order_commands.len())
60 .finish()
61 }
62}
63
64impl OrderManager {
65 pub fn new(
67 clock: Rc<RefCell<dyn Clock>>,
68 cache: Rc<RefCell<Cache>>,
69 active_local: bool,
70 ) -> Self {
74 Self {
75 clock,
76 cache,
77 active_local,
78 submit_order_commands: HashMap::new(),
82 }
83 }
84
85 #[must_use]
98 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
100 self.submit_order_commands.clone()
101 }
102
103 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
105 self.submit_order_commands
106 .insert(command.order.client_order_id(), command);
107 }
108
109 pub fn pop_submit_order_command(
111 &mut self,
112 client_order_id: ClientOrderId,
113 ) -> Option<SubmitOrder> {
114 self.submit_order_commands.remove(&client_order_id)
115 }
116
117 pub fn reset(&mut self) {
119 self.submit_order_commands.clear();
120 }
121
122 pub fn cancel_order(&mut self, order: &OrderAny) {
124 if self
125 .cache
126 .borrow()
127 .is_order_pending_cancel_local(&order.client_order_id())
128 {
129 return;
130 }
131
132 if order.is_closed() {
133 log::warn!("Cannot cancel order: already closed");
134 return;
135 }
136
137 self.submit_order_commands.remove(&order.client_order_id());
138
139 }
143
144 pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
146 }
150
151 pub fn create_new_submit_order(
155 &mut self,
156 order: &OrderAny,
157 position_id: Option<PositionId>,
158 client_id: Option<ClientId>,
159 ) -> anyhow::Result<()> {
160 let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
161 let venue_order_id = order
162 .venue_order_id()
163 .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
164
165 let submit = SubmitOrder::new(
166 order.trader_id(),
167 client_id,
168 order.strategy_id(),
169 order.instrument_id(),
170 order.client_order_id(),
171 venue_order_id,
172 order.clone(),
173 order.exec_algorithm_id(),
174 position_id,
175 UUID4::new(),
176 self.clock.borrow().timestamp_ns(),
177 )?;
178
179 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
180 self.cache_submit_order_command(submit.clone());
181
182 match order.exec_algorithm_id() {
183 Some(exec_algorithm_id) => {
184 self.send_algo_command(submit, exec_algorithm_id);
185 }
186 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
187 }
188 } Ok(())
193 }
194
195 #[must_use]
196 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
198 self.active_local && order.is_active_local()
199 }
200
201 pub fn handle_event(&mut self, event: OrderEventAny) {
204 match event {
205 OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
206 OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
207 OrderEventAny::Expired(event) => self.handle_order_expired(event),
208 OrderEventAny::Updated(event) => self.handle_order_updated(event),
209 OrderEventAny::Filled(event) => self.handle_order_filled(event),
210 _ => self.handle_position_event(event),
211 }
212 }
213
214 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
216 let cloned_order = self
217 .cache
218 .borrow()
219 .order(&rejected.client_order_id)
220 .cloned();
221 if let Some(order) = cloned_order {
222 if order.contingency_type() != Some(ContingencyType::NoContingency) {
223 self.handle_contingencies(order);
224 }
225 } else {
226 log::error!(
227 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
228 rejected.client_order_id,
229 rejected
230 );
231 }
232 }
233
234 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
235 let cloned_order = self
236 .cache
237 .borrow()
238 .order(&canceled.client_order_id)
239 .cloned();
240 if let Some(order) = cloned_order {
241 if order.contingency_type() != Some(ContingencyType::NoContingency) {
242 self.handle_contingencies(order);
243 }
244 } else {
245 log::error!(
246 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
247 canceled.client_order_id,
248 canceled
249 );
250 }
251 }
252
253 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
254 let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
255 if let Some(order) = cloned_order {
256 if order.contingency_type() != Some(ContingencyType::NoContingency) {
257 self.handle_contingencies(order);
258 }
259 } else {
260 log::error!(
261 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
262 expired.client_order_id,
263 expired
264 );
265 }
266 }
267
268 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
269 let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
270 if let Some(order) = cloned_order {
271 if order.contingency_type() != Some(ContingencyType::NoContingency) {
272 self.handle_contingencies_update(order);
273 }
274 } else {
275 log::error!(
276 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
277 updated.client_order_id,
278 updated
279 );
280 }
281 }
282
283 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
287 let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
288 {
289 order
290 } else {
291 log::error!(
292 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
293 filled.client_order_id,
294 filled
295 );
296 return;
297 };
298
299 match order.contingency_type() {
300 Some(ContingencyType::Oto) => {
301 let position_id = self
302 .cache
303 .borrow()
304 .position_id(&order.client_order_id())
305 .copied();
306 let client_id = self
307 .cache
308 .borrow()
309 .client_id(&order.client_order_id())
310 .copied();
311
312 let parent_filled_qty = match order.exec_spawn_id() {
313 Some(spawn_id) => {
314 if let Some(qty) = self
315 .cache
316 .borrow()
317 .exec_spawn_total_filled_qty(&spawn_id, true)
318 {
319 qty
320 } else {
321 log::error!("Failed to get spawn filled quantity for {spawn_id}");
322 return;
323 }
324 }
325 None => order.filled_qty(),
326 };
327
328 let linked_orders = if let Some(orders) = order.linked_order_ids() {
329 orders
330 } else {
331 log::error!("No linked orders found for OTO order");
332 return;
333 };
334
335 for client_order_id in linked_orders {
336 let mut child_order =
337 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
338 order
339 } else {
340 panic!(
341 "Cannot find OTO child order for client_order_id: {client_order_id}"
342 );
343 };
344
345 if !self.should_manage_order(&child_order) {
346 continue;
347 }
348
349 if child_order.position_id().is_none() {
350 child_order.set_position_id(position_id);
351 }
352
353 if parent_filled_qty != child_order.leaves_qty() {
354 self.modify_order_quantity(&mut child_order, parent_filled_qty);
355 }
356
357 if !self
362 .submit_order_commands
363 .contains_key(&child_order.client_order_id())
364 && let Err(e) =
365 self.create_new_submit_order(&child_order, position_id, client_id)
366 {
367 log::error!("Failed to create new submit order: {e}");
368 }
369 }
370 }
371 Some(ContingencyType::Oco) => {
372 let linked_orders = if let Some(orders) = order.linked_order_ids() {
373 orders
374 } else {
375 log::error!("No linked orders found for OCO order");
376 return;
377 };
378
379 for client_order_id in linked_orders {
380 let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
381 {
382 Some(contingent_order) => contingent_order,
383 None => {
384 panic!(
385 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
386 );
387 }
388 };
389
390 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
392 {
393 continue;
394 }
395 if contingent_order.client_order_id() != order.client_order_id() {
396 self.cancel_order(&contingent_order);
397 }
398 }
399 }
400 Some(ContingencyType::Ouo) => self.handle_contingencies(order),
401 _ => {}
402 }
403 }
404
405 pub fn handle_contingencies(&mut self, order: OrderAny) {
409 let (filled_qty, leaves_qty, is_spawn_active) =
410 if let Some(exec_spawn_id) = order.exec_spawn_id() {
411 if let (Some(filled), Some(leaves)) = (
412 self.cache
413 .borrow()
414 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
415 self.cache
416 .borrow()
417 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
418 ) {
419 (filled, leaves, leaves.raw > 0)
420 } else {
421 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
422 return;
423 }
424 } else {
425 (order.filled_qty(), order.leaves_qty(), false)
426 };
427
428 let linked_orders = if let Some(orders) = order.linked_order_ids() {
429 orders
430 } else {
431 log::error!("No linked orders found");
432 return;
433 };
434
435 for client_order_id in linked_orders {
436 let mut contingent_order =
437 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
438 order
439 } else {
440 panic!("Cannot find contingent order for client_order_id: {client_order_id}");
441 };
442
443 if !self.should_manage_order(&contingent_order)
444 || client_order_id == &order.client_order_id()
445 {
446 continue;
447 }
448
449 if contingent_order.is_closed() {
450 self.submit_order_commands.remove(&order.client_order_id());
451 continue;
452 }
453
454 match order.contingency_type() {
455 Some(ContingencyType::Oto) => {
456 if order.is_closed()
457 && filled_qty.raw == 0
458 && (order.exec_spawn_id().is_none() || !is_spawn_active)
459 {
460 self.cancel_order(&contingent_order);
461 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
462 self.modify_order_quantity(&mut contingent_order, filled_qty);
463 }
464 }
465 Some(ContingencyType::Oco) => {
466 if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
467 self.cancel_order(&contingent_order);
468 }
469 }
470 Some(ContingencyType::Ouo) => {
471 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
472 || (order.is_closed()
473 && (order.exec_spawn_id().is_none() || !is_spawn_active))
474 {
475 self.cancel_order(&contingent_order);
476 } else if leaves_qty != contingent_order.leaves_qty() {
477 self.modify_order_quantity(&mut contingent_order, leaves_qty);
478 }
479 }
480 _ => {}
481 }
482 }
483 }
484
485 pub fn handle_contingencies_update(&mut self, order: OrderAny) {
489 let quantity = match order.exec_spawn_id() {
490 Some(exec_spawn_id) => {
491 if let Some(qty) = self
492 .cache
493 .borrow()
494 .exec_spawn_total_quantity(&exec_spawn_id, true)
495 {
496 qty
497 } else {
498 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
499 return;
500 }
501 }
502 None => order.quantity(),
503 };
504
505 if quantity.raw == 0 {
506 return;
507 }
508
509 let linked_orders = if let Some(orders) = order.linked_order_ids() {
510 orders
511 } else {
512 log::error!("No linked orders found for contingent order");
513 return;
514 };
515
516 for client_order_id in linked_orders {
517 let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
518 Some(contingent_order) => contingent_order,
519 None => panic!(
520 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
521 ),
522 };
523
524 if !self.should_manage_order(&contingent_order)
525 || client_order_id == &order.client_order_id()
526 || contingent_order.is_closed()
527 {
528 continue;
529 }
530
531 if let Some(contingency_type) = order.contingency_type()
532 && matches!(
533 contingency_type,
534 ContingencyType::Oto | ContingencyType::Oco
535 )
536 && quantity != contingent_order.quantity()
537 {
538 self.modify_order_quantity(&mut contingent_order, quantity);
539 }
540 }
541 }
542
543 pub fn handle_position_event(&mut self, _event: OrderEventAny) {
544 todo!()
545 }
546
547 pub fn send_emulator_command(&self, command: TradingCommand) {
549 log::info!("{CMD}{SEND} {command}");
550
551 msgbus::send_any("OrderEmulator.execute".into(), &command);
552 }
553
554 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
555 log::info!("{CMD}{SEND} {command}");
556
557 let endpoint = format!("{exec_algorithm_id}.execute");
558 msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
559 }
560
561 pub fn send_risk_command(&self, command: TradingCommand) {
562 log::info!("{CMD}{SEND} {command}");
563 msgbus::send_any("RiskEngine.execute".into(), &command);
564 }
565
566 pub fn send_exec_command(&self, command: TradingCommand) {
567 log::info!("{CMD}{SEND} {command}");
568 msgbus::send_any("ExecEngine.execute".into(), &command);
569 }
570
571 pub fn send_risk_event(&self, event: OrderEventAny) {
572 log::info!("{EVT}{SEND} {event}");
573 msgbus::send_any("RiskEngine.process".into(), &event);
574 }
575
576 pub fn send_exec_event(&self, event: OrderEventAny) {
577 log::info!("{EVT}{SEND} {event}");
578 msgbus::send_any("ExecEngine.process".into(), &event);
579 }
580}