1use std::{cell::RefCell, collections::HashMap, rc::Rc};
17
18use nautilus_common::{
19 cache::Cache,
20 clock::Clock,
21 logging::{CMD, EVT, SENT},
22 msgbus::{self},
23};
24use nautilus_core::UUID4;
25use nautilus_model::{
26 enums::{ContingencyType, TriggerType},
27 events::{
28 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
29 },
30 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
31 orders::{Order, OrderAny},
32 types::Quantity,
33};
34use ustr::Ustr;
35
36use crate::messages::{
37 SubmitOrder, TradingCommand,
38 cancel::{CancelOrderHandler, CancelOrderHandlerAny},
39 modify::{ModifyOrderHandler, ModifyOrderHandlerAny},
40 submit::{SubmitOrderHandler, SubmitOrderHandlerAny},
41};
42
43pub struct OrderManager {
44 clock: Rc<RefCell<dyn Clock>>,
45 cache: Rc<RefCell<Cache>>,
46 active_local: bool,
47 submit_order_handler: Option<SubmitOrderHandlerAny>,
48 cancel_order_handler: Option<CancelOrderHandlerAny>,
49 modify_order_handler: Option<ModifyOrderHandlerAny>,
50 submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
51}
52
53impl OrderManager {
54 pub fn new(
55 clock: Rc<RefCell<dyn Clock>>,
56 cache: Rc<RefCell<Cache>>,
57 active_local: bool,
58 submit_order_handler: Option<SubmitOrderHandlerAny>,
59 cancel_order_handler: Option<CancelOrderHandlerAny>,
60 modify_order_handler: Option<ModifyOrderHandlerAny>,
61 ) -> Self {
62 Self {
63 clock,
64 cache,
65 active_local,
66 submit_order_handler,
67 cancel_order_handler,
68 modify_order_handler,
69 submit_order_commands: HashMap::new(),
70 }
71 }
72
73 pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
74 self.submit_order_handler = Some(handler);
75 }
76
77 pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
78 self.cancel_order_handler = Some(handler);
79 }
80
81 pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
82 self.modify_order_handler = Some(handler);
83 }
84
85 #[must_use]
86 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
87 self.submit_order_commands.clone()
88 }
89
90 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
91 self.submit_order_commands
92 .insert(command.order.client_order_id(), command);
93 }
94
95 pub fn pop_submit_order_command(
96 &mut self,
97 client_order_id: ClientOrderId,
98 ) -> Option<SubmitOrder> {
99 self.submit_order_commands.remove(&client_order_id)
100 }
101
102 pub fn reset(&mut self) {
103 self.submit_order_commands.clear();
104 }
105
106 pub fn cancel_order(&mut self, order: &OrderAny) {
107 if self
108 .cache
109 .borrow()
110 .is_order_pending_cancel_local(&order.client_order_id())
111 {
112 return;
113 }
114
115 if order.is_closed() {
116 log::warn!("Cannot cancel order: already closed");
117 return;
118 }
119
120 self.submit_order_commands.remove(&order.client_order_id());
121
122 if let Some(handler) = &self.cancel_order_handler {
123 handler.handle_cancel_order(order);
124 }
125 }
126
127 pub fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
128 if let Some(handler) = &self.modify_order_handler {
129 handler.handle_modify_order(order, new_quantity);
130 }
131 }
132
133 pub fn create_new_submit_order(
134 &mut self,
135 order: &OrderAny,
136 position_id: Option<PositionId>,
137 client_id: Option<ClientId>,
138 ) -> anyhow::Result<()> {
139 let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
140 let venue_order_id = order
141 .venue_order_id()
142 .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
143
144 let submit = SubmitOrder::new(
145 order.trader_id(),
146 client_id,
147 order.strategy_id(),
148 order.instrument_id(),
149 order.client_order_id(),
150 venue_order_id,
151 order.clone(),
152 order.exec_algorithm_id(),
153 position_id,
154 UUID4::new(),
155 self.clock.borrow().timestamp_ns(),
156 )?;
157
158 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
159 self.cache_submit_order_command(submit.clone());
160
161 match order.exec_algorithm_id() {
162 Some(exec_algorithm_id) => {
163 self.send_algo_command(submit, exec_algorithm_id);
164 }
165 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
166 }
167 } else if let Some(handler) = &self.submit_order_handler {
168 handler.handle_submit_order(submit);
169 }
170
171 Ok(())
172 }
173
174 #[must_use]
175 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
176 self.active_local && order.is_active_local()
177 }
178
179 pub fn handle_event(&mut self, event: OrderEventAny) {
181 match event {
182 OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
183 OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
184 OrderEventAny::Expired(event) => self.handle_order_expired(event),
185 OrderEventAny::Updated(event) => self.handle_order_updated(event),
186 OrderEventAny::Filled(event) => self.handle_order_filled(event),
187 _ => self.handle_position_event(event),
188 }
189 }
190
191 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
192 let cloned_order = self
193 .cache
194 .borrow()
195 .order(&rejected.client_order_id)
196 .cloned();
197 if let Some(order) = cloned_order {
198 if order.contingency_type() != Some(ContingencyType::NoContingency) {
199 self.handle_contingencies(order);
200 }
201 } else {
202 log::error!(
203 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
204 rejected.client_order_id,
205 rejected
206 );
207 }
208 }
209
210 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
211 let cloned_order = self
212 .cache
213 .borrow()
214 .order(&canceled.client_order_id)
215 .cloned();
216 if let Some(order) = cloned_order {
217 if order.contingency_type() != Some(ContingencyType::NoContingency) {
218 self.handle_contingencies(order);
219 }
220 } else {
221 log::error!(
222 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
223 canceled.client_order_id,
224 canceled
225 );
226 }
227 }
228
229 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
230 let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
231 if let Some(order) = cloned_order {
232 if order.contingency_type() != Some(ContingencyType::NoContingency) {
233 self.handle_contingencies(order);
234 }
235 } else {
236 log::error!(
237 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
238 expired.client_order_id,
239 expired
240 );
241 }
242 }
243
244 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
245 let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
246 if let Some(order) = cloned_order {
247 if order.contingency_type() != Some(ContingencyType::NoContingency) {
248 self.handle_contingencies_update(order);
249 }
250 } else {
251 log::error!(
252 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
253 updated.client_order_id,
254 updated
255 );
256 }
257 }
258
259 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
260 let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
261 {
262 order
263 } else {
264 log::error!(
265 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
266 filled.client_order_id,
267 filled
268 );
269 return;
270 };
271
272 match order.contingency_type() {
273 Some(ContingencyType::Oto) => {
274 let position_id = self
275 .cache
276 .borrow()
277 .position_id(&order.client_order_id())
278 .copied();
279 let client_id = self
280 .cache
281 .borrow()
282 .client_id(&order.client_order_id())
283 .copied();
284
285 let parent_filled_qty = match order.exec_spawn_id() {
286 Some(spawn_id) => {
287 if let Some(qty) = self
288 .cache
289 .borrow()
290 .exec_spawn_total_filled_qty(&spawn_id, true)
291 {
292 qty
293 } else {
294 log::error!("Failed to get spawn filled quantity for {spawn_id}");
295 return;
296 }
297 }
298 None => order.filled_qty(),
299 };
300
301 let linked_orders = if let Some(orders) = order.linked_order_ids() {
302 orders
303 } else {
304 log::error!("No linked orders found for OTO order");
305 return;
306 };
307
308 for client_order_id in linked_orders {
309 let mut child_order =
310 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
311 order
312 } else {
313 panic!(
314 "Cannot find OTO child order for client_order_id: {client_order_id}"
315 );
316 };
317
318 if !self.should_manage_order(&child_order) {
319 continue;
320 }
321
322 if child_order.position_id().is_none() {
323 child_order.set_position_id(position_id);
324 }
325
326 if parent_filled_qty != child_order.leaves_qty() {
327 self.modify_order_quantity(&mut child_order, parent_filled_qty);
328 }
329
330 if self.submit_order_handler.is_none() {
331 return;
332 }
333
334 if !self
335 .submit_order_commands
336 .contains_key(&child_order.client_order_id())
337 {
338 if let Err(e) =
339 self.create_new_submit_order(&child_order, position_id, client_id)
340 {
341 log::error!("Failed to create new submit order: {e}");
342 }
343 }
344 }
345 }
346 Some(ContingencyType::Oco) => {
347 let linked_orders = if let Some(orders) = order.linked_order_ids() {
348 orders
349 } else {
350 log::error!("No linked orders found for OCO order");
351 return;
352 };
353
354 for client_order_id in linked_orders {
355 let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
356 {
357 Some(contingent_order) => contingent_order,
358 None => {
359 panic!(
360 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
361 );
362 }
363 };
364
365 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
367 {
368 continue;
369 }
370 if contingent_order.client_order_id() != order.client_order_id() {
371 self.cancel_order(&contingent_order);
372 }
373 }
374 }
375 Some(ContingencyType::Ouo) => self.handle_contingencies(order),
376 _ => {}
377 }
378 }
379
380 pub fn handle_contingencies(&mut self, order: OrderAny) {
381 let (filled_qty, leaves_qty, is_spawn_active) =
382 if let Some(exec_spawn_id) = order.exec_spawn_id() {
383 if let (Some(filled), Some(leaves)) = (
384 self.cache
385 .borrow()
386 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
387 self.cache
388 .borrow()
389 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
390 ) {
391 (filled, leaves, leaves.raw > 0)
392 } else {
393 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
394 return;
395 }
396 } else {
397 (order.filled_qty(), order.leaves_qty(), false)
398 };
399
400 let linked_orders = if let Some(orders) = order.linked_order_ids() {
401 orders
402 } else {
403 log::error!("No linked orders found");
404 return;
405 };
406
407 for client_order_id in linked_orders {
408 let mut contingent_order =
409 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
410 order
411 } else {
412 panic!("Cannot find contingent order for client_order_id: {client_order_id}");
413 };
414
415 if !self.should_manage_order(&contingent_order)
416 || client_order_id == &order.client_order_id()
417 {
418 continue;
419 }
420
421 if contingent_order.is_closed() {
422 self.submit_order_commands.remove(&order.client_order_id());
423 continue;
424 }
425
426 match order.contingency_type() {
427 Some(ContingencyType::Oto) => {
428 if order.is_closed()
429 && filled_qty.raw == 0
430 && (order.exec_spawn_id().is_none() || !is_spawn_active)
431 {
432 self.cancel_order(&contingent_order);
433 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
434 self.modify_order_quantity(&mut contingent_order, filled_qty);
435 }
436 }
437 Some(ContingencyType::Oco) => {
438 if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
439 self.cancel_order(&contingent_order);
440 }
441 }
442 Some(ContingencyType::Ouo) => {
443 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
444 || (order.is_closed()
445 && (order.exec_spawn_id().is_none() || !is_spawn_active))
446 {
447 self.cancel_order(&contingent_order);
448 } else if leaves_qty != contingent_order.leaves_qty() {
449 self.modify_order_quantity(&mut contingent_order, leaves_qty);
450 }
451 }
452 _ => {}
453 }
454 }
455 }
456
457 pub fn handle_contingencies_update(&mut self, order: OrderAny) {
458 let quantity = match order.exec_spawn_id() {
459 Some(exec_spawn_id) => {
460 if let Some(qty) = self
461 .cache
462 .borrow()
463 .exec_spawn_total_quantity(&exec_spawn_id, true)
464 {
465 qty
466 } else {
467 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
468 return;
469 }
470 }
471 None => order.quantity(),
472 };
473
474 if quantity.raw == 0 {
475 return;
476 }
477
478 let linked_orders = if let Some(orders) = order.linked_order_ids() {
479 orders
480 } else {
481 log::error!("No linked orders found for contingent order");
482 return;
483 };
484
485 for client_order_id in linked_orders {
486 let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
487 Some(contingent_order) => contingent_order,
488 None => panic!(
489 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
490 ),
491 };
492
493 if !self.should_manage_order(&contingent_order)
494 || client_order_id == &order.client_order_id()
495 || contingent_order.is_closed()
496 {
497 continue;
498 }
499
500 if let Some(contingency_type) = order.contingency_type() {
501 if matches!(
502 contingency_type,
503 ContingencyType::Oto | ContingencyType::Oco
504 ) && quantity != contingent_order.quantity()
505 {
506 self.modify_order_quantity(&mut contingent_order, quantity);
507 }
508 }
509 }
510 }
511
512 pub fn handle_position_event(&mut self, _event: OrderEventAny) {
513 todo!()
514 }
515
516 pub fn send_emulator_command(&self, command: TradingCommand) {
518 log::info!("{CMD}{SENT} {command}");
519
520 msgbus::send(&Ustr::from("OrderEmulator.execute"), &command);
521 }
522
523 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
524 log::info!("{CMD}{SENT} {command}");
525
526 let endpoint = format!("{exec_algorithm_id}.execute");
527 msgbus::send(
528 &Ustr::from(&endpoint),
529 &TradingCommand::SubmitOrder(command),
530 );
531 }
532
533 pub fn send_risk_command(&self, command: TradingCommand) {
534 log::info!("{CMD}{SENT} {command}");
535 msgbus::send(&Ustr::from("RiskEngine.execute"), &command);
536 }
537
538 pub fn send_exec_command(&self, command: TradingCommand) {
539 log::info!("{CMD}{SENT} {command}");
540 msgbus::send(&Ustr::from("ExecEngine.execute"), &command);
541 }
542
543 pub fn send_risk_event(&self, event: OrderEventAny) {
544 log::info!("{EVT}{SENT} {event}");
545 msgbus::send(&Ustr::from("RiskEngine.process"), &event);
546 }
547
548 pub fn send_exec_event(&self, event: OrderEventAny) {
549 log::info!("{EVT}{SENT} {event}");
550 msgbus::send(&Ustr::from("ExecEngine.process"), &event);
551 }
552}