nautilus_execution/order_manager/
manager.rs1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
21
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 logging::{CMD, EVT, SEND},
26 messages::execution::{SubmitOrder, TradingCommand},
27 msgbus,
28};
29use nautilus_core::UUID4;
30use nautilus_model::{
31 enums::{ContingencyType, TriggerType},
32 events::{
33 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
34 },
35 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
36 orders::{Order, OrderAny},
37 types::Quantity,
38};
39
40pub struct OrderManager {
47 clock: Rc<RefCell<dyn Clock>>,
48 cache: Rc<RefCell<Cache>>,
49 active_local: bool,
50 submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
54}
55
56impl Debug for OrderManager {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct(stringify!(OrderManager))
59 .field("pending_commands", &self.submit_order_commands.len())
60 .finish()
61 }
62}
63
64impl OrderManager {
65 pub fn new(
67 clock: Rc<RefCell<dyn Clock>>,
68 cache: Rc<RefCell<Cache>>,
69 active_local: bool,
70 ) -> Self {
74 Self {
75 clock,
76 cache,
77 active_local,
78 submit_order_commands: HashMap::new(),
82 }
83 }
84
85 #[must_use]
98 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
100 self.submit_order_commands.clone()
101 }
102
103 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
105 self.submit_order_commands
106 .insert(command.order.client_order_id(), command);
107 }
108
109 pub fn pop_submit_order_command(
111 &mut self,
112 client_order_id: ClientOrderId,
113 ) -> Option<SubmitOrder> {
114 self.submit_order_commands.remove(&client_order_id)
115 }
116
117 pub fn reset(&mut self) {
119 self.submit_order_commands.clear();
120 }
121
122 pub fn cancel_order(&mut self, order: &OrderAny) {
124 if self
125 .cache
126 .borrow()
127 .is_order_pending_cancel_local(&order.client_order_id())
128 {
129 return;
130 }
131
132 if order.is_closed() {
133 log::warn!("Cannot cancel order: already closed");
134 return;
135 }
136
137 self.submit_order_commands.remove(&order.client_order_id());
138
139 }
143
144 pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
146 }
150
151 pub fn create_new_submit_order(
155 &mut self,
156 order: &OrderAny,
157 position_id: Option<PositionId>,
158 client_id: Option<ClientId>,
159 ) -> anyhow::Result<()> {
160 let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
161 let venue_order_id = order
162 .venue_order_id()
163 .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
164
165 let submit = SubmitOrder::new(
166 order.trader_id(),
167 client_id,
168 order.strategy_id(),
169 order.instrument_id(),
170 order.client_order_id(),
171 venue_order_id,
172 order.clone(),
173 order.exec_algorithm_id(),
174 position_id,
175 None, UUID4::new(),
177 self.clock.borrow().timestamp_ns(),
178 )?;
179
180 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
181 self.cache_submit_order_command(submit.clone());
182
183 match order.exec_algorithm_id() {
184 Some(exec_algorithm_id) => {
185 self.send_algo_command(submit, exec_algorithm_id);
186 }
187 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
188 }
189 } Ok(())
194 }
195
196 #[must_use]
197 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
199 self.active_local && order.is_active_local()
200 }
201
202 pub fn handle_event(&mut self, event: OrderEventAny) {
205 match event {
206 OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
207 OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
208 OrderEventAny::Expired(event) => self.handle_order_expired(event),
209 OrderEventAny::Updated(event) => self.handle_order_updated(event),
210 OrderEventAny::Filled(event) => self.handle_order_filled(event),
211 _ => self.handle_position_event(event),
212 }
213 }
214
215 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
217 let cloned_order = self
218 .cache
219 .borrow()
220 .order(&rejected.client_order_id)
221 .cloned();
222 if let Some(order) = cloned_order {
223 if order.contingency_type() != Some(ContingencyType::NoContingency) {
224 self.handle_contingencies(order);
225 }
226 } else {
227 log::error!(
228 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
229 rejected.client_order_id,
230 rejected
231 );
232 }
233 }
234
235 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
236 let cloned_order = self
237 .cache
238 .borrow()
239 .order(&canceled.client_order_id)
240 .cloned();
241 if let Some(order) = cloned_order {
242 if order.contingency_type() != Some(ContingencyType::NoContingency) {
243 self.handle_contingencies(order);
244 }
245 } else {
246 log::error!(
247 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
248 canceled.client_order_id,
249 canceled
250 );
251 }
252 }
253
254 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
255 let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
256 if let Some(order) = cloned_order {
257 if order.contingency_type() != Some(ContingencyType::NoContingency) {
258 self.handle_contingencies(order);
259 }
260 } else {
261 log::error!(
262 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
263 expired.client_order_id,
264 expired
265 );
266 }
267 }
268
269 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
270 let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
271 if let Some(order) = cloned_order {
272 if order.contingency_type() != Some(ContingencyType::NoContingency) {
273 self.handle_contingencies_update(order);
274 }
275 } else {
276 log::error!(
277 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
278 updated.client_order_id,
279 updated
280 );
281 }
282 }
283
284 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
288 let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
289 {
290 order
291 } else {
292 log::error!(
293 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
294 filled.client_order_id,
295 filled
296 );
297 return;
298 };
299
300 match order.contingency_type() {
301 Some(ContingencyType::Oto) => {
302 let position_id = self
303 .cache
304 .borrow()
305 .position_id(&order.client_order_id())
306 .copied();
307 let client_id = self
308 .cache
309 .borrow()
310 .client_id(&order.client_order_id())
311 .copied();
312
313 let parent_filled_qty = match order.exec_spawn_id() {
314 Some(spawn_id) => {
315 if let Some(qty) = self
316 .cache
317 .borrow()
318 .exec_spawn_total_filled_qty(&spawn_id, true)
319 {
320 qty
321 } else {
322 log::error!("Failed to get spawn filled quantity for {spawn_id}");
323 return;
324 }
325 }
326 None => order.filled_qty(),
327 };
328
329 let linked_orders = if let Some(orders) = order.linked_order_ids() {
330 orders
331 } else {
332 log::error!("No linked orders found for OTO order");
333 return;
334 };
335
336 for client_order_id in linked_orders {
337 let mut child_order =
338 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
339 order
340 } else {
341 panic!(
342 "Cannot find OTO child order for client_order_id: {client_order_id}"
343 );
344 };
345
346 if !self.should_manage_order(&child_order) {
347 continue;
348 }
349
350 if child_order.position_id().is_none() {
351 child_order.set_position_id(position_id);
352 }
353
354 if parent_filled_qty != child_order.leaves_qty() {
355 self.modify_order_quantity(&mut child_order, parent_filled_qty);
356 }
357
358 if !self
363 .submit_order_commands
364 .contains_key(&child_order.client_order_id())
365 && let Err(e) =
366 self.create_new_submit_order(&child_order, position_id, client_id)
367 {
368 log::error!("Failed to create new submit order: {e}");
369 }
370 }
371 }
372 Some(ContingencyType::Oco) => {
373 let linked_orders = if let Some(orders) = order.linked_order_ids() {
374 orders
375 } else {
376 log::error!("No linked orders found for OCO order");
377 return;
378 };
379
380 for client_order_id in linked_orders {
381 let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
382 {
383 Some(contingent_order) => contingent_order,
384 None => {
385 panic!(
386 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
387 );
388 }
389 };
390
391 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
393 {
394 continue;
395 }
396 if contingent_order.client_order_id() != order.client_order_id() {
397 self.cancel_order(&contingent_order);
398 }
399 }
400 }
401 Some(ContingencyType::Ouo) => self.handle_contingencies(order),
402 _ => {}
403 }
404 }
405
406 pub fn handle_contingencies(&mut self, order: OrderAny) {
410 let (filled_qty, leaves_qty, is_spawn_active) =
411 if let Some(exec_spawn_id) = order.exec_spawn_id() {
412 if let (Some(filled), Some(leaves)) = (
413 self.cache
414 .borrow()
415 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
416 self.cache
417 .borrow()
418 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
419 ) {
420 (filled, leaves, leaves.raw > 0)
421 } else {
422 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
423 return;
424 }
425 } else {
426 (order.filled_qty(), order.leaves_qty(), false)
427 };
428
429 let linked_orders = if let Some(orders) = order.linked_order_ids() {
430 orders
431 } else {
432 log::error!("No linked orders found");
433 return;
434 };
435
436 for client_order_id in linked_orders {
437 let mut contingent_order =
438 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
439 order
440 } else {
441 panic!("Cannot find contingent order for client_order_id: {client_order_id}");
442 };
443
444 if !self.should_manage_order(&contingent_order)
445 || client_order_id == &order.client_order_id()
446 {
447 continue;
448 }
449
450 if contingent_order.is_closed() {
451 self.submit_order_commands.remove(&order.client_order_id());
452 continue;
453 }
454
455 match order.contingency_type() {
456 Some(ContingencyType::Oto) => {
457 if order.is_closed()
458 && filled_qty.raw == 0
459 && (order.exec_spawn_id().is_none() || !is_spawn_active)
460 {
461 self.cancel_order(&contingent_order);
462 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
463 self.modify_order_quantity(&mut contingent_order, filled_qty);
464 }
465 }
466 Some(ContingencyType::Oco) => {
467 if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
468 self.cancel_order(&contingent_order);
469 }
470 }
471 Some(ContingencyType::Ouo) => {
472 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
473 || (order.is_closed()
474 && (order.exec_spawn_id().is_none() || !is_spawn_active))
475 {
476 self.cancel_order(&contingent_order);
477 } else if leaves_qty != contingent_order.leaves_qty() {
478 self.modify_order_quantity(&mut contingent_order, leaves_qty);
479 }
480 }
481 _ => {}
482 }
483 }
484 }
485
486 pub fn handle_contingencies_update(&mut self, order: OrderAny) {
490 let quantity = match order.exec_spawn_id() {
491 Some(exec_spawn_id) => {
492 if let Some(qty) = self
493 .cache
494 .borrow()
495 .exec_spawn_total_quantity(&exec_spawn_id, true)
496 {
497 qty
498 } else {
499 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
500 return;
501 }
502 }
503 None => order.quantity(),
504 };
505
506 if quantity.raw == 0 {
507 return;
508 }
509
510 let linked_orders = if let Some(orders) = order.linked_order_ids() {
511 orders
512 } else {
513 log::error!("No linked orders found for contingent order");
514 return;
515 };
516
517 for client_order_id in linked_orders {
518 let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
519 Some(contingent_order) => contingent_order,
520 None => panic!(
521 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
522 ),
523 };
524
525 if !self.should_manage_order(&contingent_order)
526 || client_order_id == &order.client_order_id()
527 || contingent_order.is_closed()
528 {
529 continue;
530 }
531
532 if let Some(contingency_type) = order.contingency_type()
533 && matches!(
534 contingency_type,
535 ContingencyType::Oto | ContingencyType::Oco
536 )
537 && quantity != contingent_order.quantity()
538 {
539 self.modify_order_quantity(&mut contingent_order, quantity);
540 }
541 }
542 }
543
544 pub fn handle_position_event(&mut self, _event: OrderEventAny) {
545 todo!()
546 }
547
548 pub fn send_emulator_command(&self, command: TradingCommand) {
550 log::info!("{CMD}{SEND} {command}");
551
552 msgbus::send_any("OrderEmulator.execute".into(), &command);
553 }
554
555 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
556 log::info!("{CMD}{SEND} {command}");
557
558 let endpoint = format!("{exec_algorithm_id}.execute");
559 msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
560 }
561
562 pub fn send_risk_command(&self, command: TradingCommand) {
563 log::info!("{CMD}{SEND} {command}");
564 msgbus::send_any("RiskEngine.execute".into(), &command);
565 }
566
567 pub fn send_exec_command(&self, command: TradingCommand) {
568 log::info!("{CMD}{SEND} {command}");
569 msgbus::send_any("ExecEngine.execute".into(), &command);
570 }
571
572 pub fn send_risk_event(&self, event: OrderEventAny) {
573 log::info!("{EVT}{SEND} {event}");
574 msgbus::send_any("RiskEngine.process".into(), &event);
575 }
576
577 pub fn send_exec_event(&self, event: OrderEventAny) {
578 log::info!("{EVT}{SEND} {event}");
579 msgbus::send_any("ExecEngine.process".into(), &event);
580 }
581}