nautilus_execution/order_manager/
manager.rs1use std::{cell::RefCell, collections::HashMap, rc::Rc};
17
18use nautilus_common::{
19 cache::Cache,
20 clock::Clock,
21 logging::{CMD, EVT, SENT},
22 msgbus::MessageBus,
23};
24use nautilus_core::UUID4;
25use nautilus_model::{
26 enums::{ContingencyType, TriggerType},
27 events::{
28 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
29 },
30 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
31 orders::OrderAny,
32 types::Quantity,
33};
34use ustr::Ustr;
35
36use crate::messages::{
37 SubmitOrder, TradingCommand,
38 cancel::{CancelOrderHandler, CancelOrderHandlerAny},
39 modify::{ModifyOrderHandler, ModifyOrderHandlerAny},
40 submit::{SubmitOrderHandler, SubmitOrderHandlerAny},
41};
42
43pub struct OrderManager {
44 clock: Rc<RefCell<dyn Clock>>,
45 cache: Rc<RefCell<Cache>>,
46 msgbus: Rc<RefCell<MessageBus>>,
47 active_local: bool,
48 submit_order_handler: Option<SubmitOrderHandlerAny>,
49 cancel_order_handler: Option<CancelOrderHandlerAny>,
50 modify_order_handler: Option<ModifyOrderHandlerAny>,
51 submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
52}
53
54impl OrderManager {
55 pub fn new(
56 clock: Rc<RefCell<dyn Clock>>,
57 msgbus: Rc<RefCell<MessageBus>>,
58 cache: Rc<RefCell<Cache>>,
59 active_local: bool,
60 submit_order_handler: Option<SubmitOrderHandlerAny>,
61 cancel_order_handler: Option<CancelOrderHandlerAny>,
62 modify_order_handler: Option<ModifyOrderHandlerAny>,
63 ) -> Self {
64 Self {
65 clock,
66 cache,
67 msgbus,
68 active_local,
69 submit_order_handler,
70 cancel_order_handler,
71 modify_order_handler,
72 submit_order_commands: HashMap::new(),
73 }
74 }
75
76 pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
77 self.submit_order_handler = Some(handler);
78 }
79
80 pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
81 self.cancel_order_handler = Some(handler);
82 }
83
84 pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
85 self.modify_order_handler = Some(handler);
86 }
87
88 #[must_use]
89 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
90 self.submit_order_commands.clone()
91 }
92
93 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
94 self.submit_order_commands
95 .insert(command.order.client_order_id(), command);
96 }
97
98 pub fn pop_submit_order_command(
99 &mut self,
100 client_order_id: ClientOrderId,
101 ) -> Option<SubmitOrder> {
102 self.submit_order_commands.remove(&client_order_id)
103 }
104
105 pub fn reset(&mut self) {
106 self.submit_order_commands.clear();
107 }
108
109 pub fn cancel_order(&mut self, order: &OrderAny) {
110 if self
111 .cache
112 .borrow()
113 .is_order_pending_cancel_local(&order.client_order_id())
114 {
115 return;
116 }
117
118 if order.is_closed() {
119 log::warn!("Cannot cancel order: already closed");
120 return;
121 }
122
123 self.submit_order_commands.remove(&order.client_order_id());
124
125 if let Some(handler) = &self.cancel_order_handler {
126 handler.handle_cancel_order(order);
127 }
128 }
129
130 pub fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
131 if let Some(handler) = &self.modify_order_handler {
132 handler.handle_modify_order(order, new_quantity);
133 }
134 }
135
136 pub fn create_new_submit_order(
137 &mut self,
138 order: &OrderAny,
139 position_id: Option<PositionId>,
140 client_id: Option<ClientId>,
141 ) -> anyhow::Result<()> {
142 let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
143 let venue_order_id = order
144 .venue_order_id()
145 .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
146
147 let submit = SubmitOrder::new(
148 order.trader_id(),
149 client_id,
150 order.strategy_id(),
151 order.instrument_id(),
152 order.client_order_id(),
153 venue_order_id,
154 order.clone(),
155 order.exec_algorithm_id(),
156 position_id,
157 UUID4::new(),
158 self.clock.borrow().timestamp_ns(),
159 )?;
160
161 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
162 self.cache_submit_order_command(submit.clone());
163
164 match order.exec_algorithm_id() {
165 Some(exec_algorithm_id) => {
166 self.send_algo_command(submit, exec_algorithm_id);
167 }
168 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
169 }
170 } else if let Some(handler) = &self.submit_order_handler {
171 handler.handle_submit_order(submit);
172 }
173
174 Ok(())
175 }
176
177 #[must_use]
178 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
179 self.active_local && order.is_active_local()
180 }
181
182 pub fn handle_event(&mut self, event: OrderEventAny) {
184 match event {
185 OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
186 OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
187 OrderEventAny::Expired(event) => self.handle_order_expired(event),
188 OrderEventAny::Updated(event) => self.handle_order_updated(event),
189 OrderEventAny::Filled(event) => self.handle_order_filled(event),
190 _ => self.handle_position_event(event),
191 }
192 }
193
194 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
195 let cloned_order = self
196 .cache
197 .borrow()
198 .order(&rejected.client_order_id)
199 .cloned();
200 if let Some(order) = cloned_order {
201 if order.contingency_type() != Some(ContingencyType::NoContingency) {
202 self.handle_contingencies(order);
203 }
204 } else {
205 log::error!(
206 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
207 rejected.client_order_id,
208 rejected
209 );
210 }
211 }
212
213 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
214 let cloned_order = self
215 .cache
216 .borrow()
217 .order(&canceled.client_order_id)
218 .cloned();
219 if let Some(order) = cloned_order {
220 if order.contingency_type() != Some(ContingencyType::NoContingency) {
221 self.handle_contingencies(order);
222 }
223 } else {
224 log::error!(
225 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
226 canceled.client_order_id,
227 canceled
228 );
229 }
230 }
231
232 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
233 let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
234 if let Some(order) = cloned_order {
235 if order.contingency_type() != Some(ContingencyType::NoContingency) {
236 self.handle_contingencies(order);
237 }
238 } else {
239 log::error!(
240 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
241 expired.client_order_id,
242 expired
243 );
244 }
245 }
246
247 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
248 let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
249 if let Some(order) = cloned_order {
250 if order.contingency_type() != Some(ContingencyType::NoContingency) {
251 self.handle_contingencies_update(order);
252 }
253 } else {
254 log::error!(
255 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
256 updated.client_order_id,
257 updated
258 );
259 }
260 }
261
262 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
263 let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
264 {
265 order
266 } else {
267 log::error!(
268 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
269 filled.client_order_id,
270 filled
271 );
272 return;
273 };
274
275 match order.contingency_type() {
276 Some(ContingencyType::Oto) => {
277 let position_id = self
278 .cache
279 .borrow()
280 .position_id(&order.client_order_id())
281 .copied();
282 let client_id = self
283 .cache
284 .borrow()
285 .client_id(&order.client_order_id())
286 .copied();
287
288 let parent_filled_qty = match order.exec_spawn_id() {
289 Some(spawn_id) => {
290 if let Some(qty) = self
291 .cache
292 .borrow()
293 .exec_spawn_total_filled_qty(&spawn_id, true)
294 {
295 qty
296 } else {
297 log::error!("Failed to get spawn filled quantity for {spawn_id}");
298 return;
299 }
300 }
301 None => order.filled_qty(),
302 };
303
304 let linked_orders = if let Some(orders) = order.linked_order_ids() {
305 orders
306 } else {
307 log::error!("No linked orders found for OTO order");
308 return;
309 };
310
311 for client_order_id in linked_orders {
312 let mut child_order =
313 if let Some(order) = self.cache.borrow().order(&client_order_id).cloned() {
314 order
315 } else {
316 panic!(
317 "Cannot find OTO child order for client_order_id: {client_order_id}"
318 );
319 };
320
321 if !self.should_manage_order(&child_order) {
322 continue;
323 }
324
325 if child_order.position_id().is_none() {
326 child_order.set_position_id(position_id);
327 }
328
329 if parent_filled_qty != child_order.leaves_qty() {
330 self.modify_order_quantity(&mut child_order, parent_filled_qty);
331 }
332
333 if self.submit_order_handler.is_none() {
334 return;
335 }
336
337 if !self
338 .submit_order_commands
339 .contains_key(&child_order.client_order_id())
340 {
341 if let Err(e) =
342 self.create_new_submit_order(&child_order, position_id, client_id)
343 {
344 log::error!("Failed to create new submit order: {e}");
345 }
346 }
347 }
348 }
349 Some(ContingencyType::Oco) => {
350 let linked_orders = if let Some(orders) = order.linked_order_ids() {
351 orders
352 } else {
353 log::error!("No linked orders found for OCO order");
354 return;
355 };
356
357 for client_order_id in linked_orders {
358 let contingent_order = match self
359 .cache
360 .borrow()
361 .order(&client_order_id)
362 .cloned()
363 {
364 Some(contingent_order) => contingent_order,
365 None => {
366 panic!(
367 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
368 );
369 }
370 };
371
372 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
374 {
375 continue;
376 }
377 if contingent_order.client_order_id() != order.client_order_id() {
378 self.cancel_order(&contingent_order);
379 }
380 }
381 }
382 Some(ContingencyType::Ouo) => self.handle_contingencies(order),
383 _ => {}
384 }
385 }
386
387 pub fn handle_contingencies(&mut self, order: OrderAny) {
388 let (filled_qty, leaves_qty, is_spawn_active) =
389 if let Some(exec_spawn_id) = order.exec_spawn_id() {
390 if let (Some(filled), Some(leaves)) = (
391 self.cache
392 .borrow()
393 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
394 self.cache
395 .borrow()
396 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
397 ) {
398 (filled, leaves, leaves.raw > 0)
399 } else {
400 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
401 return;
402 }
403 } else {
404 (order.filled_qty(), order.leaves_qty(), false)
405 };
406
407 let linked_orders = if let Some(orders) = order.linked_order_ids() {
408 orders
409 } else {
410 log::error!("No linked orders found");
411 return;
412 };
413
414 for client_order_id in linked_orders {
415 let mut contingent_order =
416 if let Some(order) = self.cache.borrow().order(&client_order_id).cloned() {
417 order
418 } else {
419 panic!("Cannot find contingent order for client_order_id: {client_order_id}");
420 };
421
422 if !self.should_manage_order(&contingent_order)
423 || client_order_id == order.client_order_id()
424 {
425 continue;
426 }
427
428 if contingent_order.is_closed() {
429 self.submit_order_commands.remove(&order.client_order_id());
430 continue;
431 }
432
433 match order.contingency_type() {
434 Some(ContingencyType::Oto) => {
435 if order.is_closed()
436 && filled_qty.raw == 0
437 && (order.exec_spawn_id().is_none() || !is_spawn_active)
438 {
439 self.cancel_order(&contingent_order);
440 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
441 self.modify_order_quantity(&mut contingent_order, filled_qty);
442 }
443 }
444 Some(ContingencyType::Oco) => {
445 if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
446 self.cancel_order(&contingent_order);
447 }
448 }
449 Some(ContingencyType::Ouo) => {
450 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
451 || (order.is_closed()
452 && (order.exec_spawn_id().is_none() || !is_spawn_active))
453 {
454 self.cancel_order(&contingent_order);
455 } else if leaves_qty != contingent_order.leaves_qty() {
456 self.modify_order_quantity(&mut contingent_order, leaves_qty);
457 }
458 }
459 _ => {}
460 }
461 }
462 }
463
464 pub fn handle_contingencies_update(&mut self, order: OrderAny) {
465 let quantity = match order.exec_spawn_id() {
466 Some(exec_spawn_id) => {
467 if let Some(qty) = self
468 .cache
469 .borrow()
470 .exec_spawn_total_quantity(&exec_spawn_id, true)
471 {
472 qty
473 } else {
474 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
475 return;
476 }
477 }
478 None => order.quantity(),
479 };
480
481 if quantity.raw == 0 {
482 return;
483 }
484
485 let linked_orders = if let Some(orders) = order.linked_order_ids() {
486 orders
487 } else {
488 log::error!("No linked orders found for contingent order");
489 return;
490 };
491
492 for client_order_id in linked_orders {
493 let mut contingent_order = match self.cache.borrow().order(&client_order_id).cloned() {
494 Some(contingent_order) => contingent_order,
495 None => panic!(
496 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
497 ),
498 };
499
500 if !self.should_manage_order(&contingent_order)
501 || client_order_id == order.client_order_id()
502 || contingent_order.is_closed()
503 {
504 continue;
505 }
506
507 if let Some(contingency_type) = order.contingency_type() {
508 if matches!(
509 contingency_type,
510 ContingencyType::Oto | ContingencyType::Oco
511 ) && quantity != contingent_order.quantity()
512 {
513 self.modify_order_quantity(&mut contingent_order, quantity);
514 }
515 }
516 }
517 }
518
519 pub fn handle_position_event(&mut self, _event: OrderEventAny) {
520 todo!()
521 }
522
523 pub fn send_emulator_command(&self, command: TradingCommand) {
525 log::info!("{CMD}{SENT} {command}");
526
527 self.msgbus
528 .borrow()
529 .send(&Ustr::from("OrderEmulator.execute"), &command);
530 }
531
532 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
533 log::info!("{CMD}{SENT} {command}");
534
535 let endpoint = format!("{exec_algorithm_id}.execute");
536 self.msgbus.borrow().send(
537 &Ustr::from(&endpoint),
538 &TradingCommand::SubmitOrder(command),
539 );
540 }
541
542 pub fn send_risk_command(&self, command: TradingCommand) {
543 log::info!("{CMD}{SENT} {command}");
544
545 self.msgbus
546 .borrow()
547 .send(&Ustr::from("RiskEngine.execute"), &command);
548 }
549
550 pub fn send_exec_command(&self, command: TradingCommand) {
551 log::info!("{CMD}{SENT} {command}");
552
553 self.msgbus
554 .borrow()
555 .send(&Ustr::from("ExecEngine.execute"), &command);
556 }
557
558 pub fn send_risk_event(&self, event: OrderEventAny) {
559 log::info!("{}{} {}", EVT, SENT, event);
560 self.msgbus
561 .borrow()
562 .send(&Ustr::from("RiskEngine.process"), &event);
563 }
564
565 pub fn send_exec_event(&self, event: OrderEventAny) {
566 log::info!("{}{} {}", EVT, SENT, event);
567 self.msgbus
568 .borrow()
569 .send(&Ustr::from("ExecEngine.process"), &event);
570 }
571}