1use std::{cell::RefCell, fmt::Debug, rc::Rc};
17
18use ahash::{AHashMap, AHashSet};
19use nautilus_common::{
20 cache::Cache,
21 clock::Clock,
22 logging::{CMD, EVT, RECV},
23 messages::execution::{
24 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
25 },
26 msgbus::{self, handler::ShareableMessageHandler},
27};
28use nautilus_core::UUID4;
29use nautilus_model::{
30 data::{OrderBookDeltas, QuoteTick, TradeTick},
31 enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
32 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
33 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
34 instruments::Instrument,
35 orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
36 types::{Price, Quantity},
37};
38
39use crate::{
40 matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
41 trailing::trailing_stop_calculate,
42};
43
44pub struct OrderEmulator {
45 clock: Rc<RefCell<dyn Clock>>,
46 cache: Rc<RefCell<Cache>>,
47 manager: OrderManager,
48 matching_cores: AHashMap<InstrumentId, OrderMatchingCore>,
49 subscribed_quotes: AHashSet<InstrumentId>,
50 subscribed_trades: AHashSet<InstrumentId>,
51 subscribed_strategies: AHashSet<StrategyId>,
52 monitored_positions: AHashSet<PositionId>,
53 on_event_handler: Option<ShareableMessageHandler>,
54}
55
56impl Debug for OrderEmulator {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct(stringify!(OrderEmulator))
59 .field("cores", &self.matching_cores.len())
60 .field("subscribed_quotes", &self.subscribed_quotes.len())
61 .finish()
62 }
63}
64
65impl OrderEmulator {
66 pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
67 let active_local = true;
71 let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
72
73 Self {
74 clock,
75 cache,
76 manager,
77 matching_cores: AHashMap::new(),
78 subscribed_quotes: AHashSet::new(),
79 subscribed_trades: AHashSet::new(),
80 subscribed_strategies: AHashSet::new(),
81 monitored_positions: AHashSet::new(),
82 on_event_handler: None,
83 }
84 }
85
86 pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
87 self.on_event_handler = Some(handler);
88 }
89
90 #[must_use]
104 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
105 let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
106 quotes.sort();
107 quotes
108 }
109
110 #[must_use]
111 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
112 let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
113 trades.sort();
114 trades
115 }
116
117 #[must_use]
118 pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
119 self.manager.get_submit_order_commands()
120 }
121
122 #[must_use]
123 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
124 self.matching_cores.get(instrument_id).cloned()
125 }
126
127 pub fn on_start(&mut self) -> anyhow::Result<()> {
137 let emulated_orders: Vec<OrderAny> = self
138 .cache
139 .borrow()
140 .orders_emulated(None, None, None, None)
141 .into_iter()
142 .cloned()
143 .collect();
144
145 if emulated_orders.is_empty() {
146 log::error!("No emulated orders to reactivate");
147 return Ok(());
148 }
149
150 for order in emulated_orders {
151 if !matches!(
152 order.status(),
153 OrderStatus::Initialized | OrderStatus::Emulated
154 ) {
155 continue; }
157
158 if let Some(parent_order_id) = &order.parent_order_id() {
159 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
160 order.clone()
161 } else {
162 log::error!("Cannot handle order: parent {parent_order_id} not found");
163 continue;
164 };
165
166 let is_position_closed = parent_order
167 .position_id()
168 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
169 if parent_order.is_closed() && is_position_closed {
170 self.manager.cancel_order(&order);
171 continue; }
173
174 if parent_order.contingency_type() == Some(ContingencyType::Oto)
175 && (parent_order.is_active_local()
176 || parent_order.filled_qty() == Quantity::zero(0))
177 {
178 continue; }
180 }
181
182 let position_id = self
183 .cache
184 .borrow()
185 .position_id(&order.client_order_id())
186 .copied();
187 let client_id = self
188 .cache
189 .borrow()
190 .client_id(&order.client_order_id())
191 .copied();
192
193 let command = SubmitOrder::new(
194 order.trader_id(),
195 client_id,
196 order.strategy_id(),
197 order.instrument_id(),
198 order.clone(),
199 order.exec_algorithm_id(),
200 position_id,
201 None, UUID4::new(),
203 self.clock.borrow().timestamp_ns(),
204 );
205
206 self.handle_submit_order(command);
207 }
208
209 Ok(())
210 }
211
212 pub fn on_event(&mut self, event: OrderEventAny) {
216 log::info!("{RECV}{EVT} {event}");
217
218 self.manager.handle_event(event.clone());
219
220 if let Some(order) = self.cache.borrow().order(&event.client_order_id())
221 && order.is_closed()
222 && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
223 && let Err(e) = matching_core.delete_order(
224 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
225 )
226 {
227 log::error!("Error deleting order: {e}");
228 }
229 }
231
232 pub const fn on_stop(&self) {}
233
234 pub fn on_reset(&mut self) {
235 self.manager.reset();
236 self.matching_cores.clear();
237 }
238
239 pub const fn on_dispose(&self) {}
240
241 pub fn execute(&mut self, command: TradingCommand) {
242 log::info!("{RECV}{CMD} {command}");
243
244 match command {
245 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
246 TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
247 TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
248 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
249 TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
250 _ => log::error!("Cannot handle command: unrecognized {command:?}"),
251 }
252 }
253
254 fn create_matching_core(
255 &mut self,
256 instrument_id: InstrumentId,
257 price_increment: Price,
258 ) -> OrderMatchingCore {
259 let matching_core =
260 OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
261 self.matching_cores
262 .insert(instrument_id, matching_core.clone());
263 log::info!("Creating matching core for {instrument_id:?}");
264 matching_core
265 }
266
267 pub fn handle_submit_order(&mut self, command: SubmitOrder) {
271 let mut order = command.order.clone();
272 let emulation_trigger = order.emulation_trigger();
273
274 assert_ne!(
275 emulation_trigger,
276 Some(TriggerType::NoTrigger),
277 "command.order.emulation_trigger must not be TriggerType::NoTrigger"
278 );
279 assert!(
280 self.manager
281 .get_submit_order_commands()
282 .contains_key(&order.client_order_id()),
283 "command.order.client_order_id must be in submit_order_commands"
284 );
285
286 if !matches!(
287 emulation_trigger,
288 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
289 ) {
290 log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
291 self.manager.cancel_order(&order);
292 return;
293 }
294
295 self.check_monitoring(command.strategy_id, command.position_id);
296
297 let trigger_instrument_id = order
299 .trigger_instrument_id()
300 .unwrap_or_else(|| order.instrument_id());
301
302 let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
303
304 let mut matching_core = if let Some(core) = matching_core {
305 core
306 } else {
307 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
309 let synthetic = self
310 .cache
311 .borrow()
312 .synthetic(&trigger_instrument_id)
313 .cloned();
314 if let Some(synthetic) = synthetic {
315 (synthetic.id, synthetic.price_increment)
316 } else {
317 log::error!(
318 "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
319 );
320 self.manager.cancel_order(&order);
321 return;
322 }
323 } else {
324 let instrument = self
325 .cache
326 .borrow()
327 .instrument(&trigger_instrument_id)
328 .cloned();
329 if let Some(instrument) = instrument {
330 (instrument.id(), instrument.price_increment())
331 } else {
332 log::error!(
333 "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
334 );
335 self.manager.cancel_order(&order);
336 return;
337 }
338 };
339
340 self.create_matching_core(instrument_id, price_increment)
341 };
342
343 if matches!(
345 order.order_type(),
346 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
347 ) {
348 self.update_trailing_stop_order(&mut order);
349 if order.trigger_price().is_none() {
350 log::error!(
351 "Cannot handle trailing stop order with no trigger_price and no market updates"
352 );
353
354 self.manager.cancel_order(&order);
355 return;
356 }
357 }
358
359 self.manager.cache_submit_order_command(command);
361
362 matching_core.match_order(
364 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
365 true,
366 );
367
368 match emulation_trigger.unwrap() {
370 TriggerType::Default | TriggerType::BidAsk => {
371 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
372 if !trigger_instrument_id.is_synthetic() {
373 }
376 self.subscribed_quotes.insert(trigger_instrument_id);
379 }
380 }
381 TriggerType::LastPrice => {
382 if !self.subscribed_trades.contains(&trigger_instrument_id) {
383 self.subscribed_trades.insert(trigger_instrument_id);
386 }
387 }
388 _ => {
389 log::error!("Invalid TriggerType: {emulation_trigger:?}");
390 return;
391 }
392 }
393
394 if !self
396 .manager
397 .get_submit_order_commands()
398 .contains_key(&order.client_order_id())
399 {
400 return; }
402
403 if let Err(e) = matching_core
405 .add_order(PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"))
406 {
407 log::error!("Cannot add order: {e:?}");
408 return;
409 }
410
411 if order.status() == OrderStatus::Initialized {
413 let event = OrderEmulated::new(
414 order.trader_id(),
415 order.strategy_id(),
416 order.instrument_id(),
417 order.client_order_id(),
418 UUID4::new(),
419 self.clock.borrow().timestamp_ns(),
420 self.clock.borrow().timestamp_ns(),
421 );
422
423 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
424 log::error!("Cannot apply order event: {e:?}");
425 return;
426 }
427
428 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
429 log::error!("Cannot update order: {e:?}");
430 return;
431 }
432
433 self.manager.send_risk_event(OrderEventAny::Emulated(event));
434
435 msgbus::publish(
436 format!("events.order.{}", order.strategy_id()).into(),
437 &OrderEventAny::Emulated(event),
438 );
439 }
440
441 self.matching_cores
443 .insert(trigger_instrument_id, matching_core);
444
445 log::info!("Emulating {order}");
446 }
447
448 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
449 self.check_monitoring(command.strategy_id, command.position_id);
450
451 for order in &command.order_list.orders {
452 if let Some(parent_order_id) = order.parent_order_id() {
453 let cache = self.cache.borrow();
454 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
455 parent_order
456 } else {
457 log::error!("Parent order for {} not found", order.client_order_id());
458 continue;
459 };
460
461 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
462 continue; }
464 }
465
466 if let Err(e) =
467 self.manager
468 .create_new_submit_order(order, command.position_id, command.client_id)
469 {
470 log::error!("Error creating new submit order: {e}");
471 }
472 }
473 }
474
475 fn handle_modify_order(&mut self, command: ModifyOrder) {
476 if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
477 let price = match command.price {
478 Some(price) => Some(price),
479 None => order.price(),
480 };
481
482 let trigger_price = match command.trigger_price {
483 Some(trigger_price) => Some(trigger_price),
484 None => order.trigger_price(),
485 };
486
487 let ts_now = self.clock.borrow().timestamp_ns();
489 let event = OrderUpdated::new(
490 order.trader_id(),
491 order.strategy_id(),
492 order.instrument_id(),
493 order.client_order_id(),
494 command.quantity.unwrap_or(order.quantity()),
495 UUID4::new(),
496 ts_now,
497 ts_now,
498 false,
499 order.venue_order_id(),
500 order.account_id(),
501 price,
502 trigger_price,
503 None,
504 );
505
506 self.manager.send_exec_event(OrderEventAny::Updated(event));
507
508 let trigger_instrument_id = order
509 .trigger_instrument_id()
510 .unwrap_or_else(|| order.instrument_id());
511
512 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
513 matching_core.match_order(
514 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
515 false,
516 );
517 } else {
518 log::error!(
519 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
520 );
521 }
522 } else {
523 log::error!("Cannot modify order: {} not found", command.client_order_id);
524 }
525 }
526
527 pub fn handle_cancel_order(&mut self, command: CancelOrder) {
528 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
529 order.clone()
530 } else {
531 log::error!("Cannot cancel order: {} not found", command.client_order_id);
532 return;
533 };
534
535 let trigger_instrument_id = order
536 .trigger_instrument_id()
537 .unwrap_or_else(|| order.instrument_id());
538
539 let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
540 core
541 } else {
542 self.manager.cancel_order(&order);
543 return;
544 };
545
546 if !matching_core.order_exists(order.client_order_id())
547 && order.is_open()
548 && !order.is_pending_cancel()
549 {
550 self.manager
552 .send_exec_command(TradingCommand::CancelOrder(command));
553 } else {
554 self.manager.cancel_order(&order);
555 }
556 }
557
558 fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
559 let instrument_id = command.instrument_id;
560 let matching_core = match self.matching_cores.get(&instrument_id) {
561 Some(core) => core,
562 None => return, };
564
565 let orders_to_cancel = match command.order_side {
566 OrderSide::NoOrderSide => {
567 let mut all_orders = Vec::new();
569 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
570 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
571 all_orders
572 }
573 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
574 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
575 };
576
577 for order in orders_to_cancel {
579 self.manager.cancel_order(&OrderAny::from(order));
580 }
581 }
582
583 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
586 log::info!(
587 "Updating order {} quantity to {new_quantity}",
588 order.client_order_id(),
589 );
590
591 let ts_now = self.clock.borrow().timestamp_ns();
593 let event = OrderUpdated::new(
594 order.trader_id(),
595 order.strategy_id(),
596 order.instrument_id(),
597 order.client_order_id(),
598 new_quantity,
599 UUID4::new(),
600 ts_now,
601 ts_now,
602 false,
603 None,
604 order.account_id(),
605 None,
606 None,
607 None,
608 );
609
610 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
611 log::error!("Cannot apply order event: {e:?}");
612 return;
613 }
614 if let Err(e) = self.cache.borrow_mut().update_order(order) {
615 log::error!("Cannot update order: {e:?}");
616 return;
617 }
618
619 self.manager.send_risk_event(OrderEventAny::Updated(event));
620 }
621
622 pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
625 log::debug!("Processing {deltas:?}");
626
627 let instrument_id = &deltas.instrument_id;
628 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
629 if let Some(book) = self.cache.borrow().order_book(instrument_id) {
630 let best_bid = book.best_bid_price();
631 let best_ask = book.best_ask_price();
632
633 if let Some(best_bid) = best_bid {
634 matching_core.set_bid_raw(best_bid);
635 }
636
637 if let Some(best_ask) = best_ask {
638 matching_core.set_ask_raw(best_ask);
639 }
640 } else {
641 log::error!(
642 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
643 deltas.instrument_id
644 );
645 }
646
647 self.iterate_orders(instrument_id);
648 } else {
649 log::error!(
650 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
651 deltas.instrument_id
652 );
653 }
654 }
655
656 pub fn on_quote_tick(&mut self, quote: QuoteTick) {
657 log::debug!("Processing {quote}:?");
658
659 let instrument_id = "e.instrument_id;
660 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
661 matching_core.set_bid_raw(quote.bid_price);
662 matching_core.set_ask_raw(quote.ask_price);
663
664 self.iterate_orders(instrument_id);
665 } else {
666 log::error!(
667 "Cannot handle `QuoteTick`: no matching core for instrument {}",
668 quote.instrument_id
669 );
670 }
671 }
672
673 pub fn on_trade_tick(&mut self, trade: TradeTick) {
674 log::debug!("Processing {trade:?}");
675
676 let instrument_id = &trade.instrument_id;
677 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
678 matching_core.set_last_raw(trade.price);
679 if !self.subscribed_quotes.contains(instrument_id) {
680 matching_core.set_bid_raw(trade.price);
681 matching_core.set_ask_raw(trade.price);
682 }
683
684 self.iterate_orders(instrument_id);
685 } else {
686 log::error!(
687 "Cannot handle `TradeTick`: no matching core for instrument {}",
688 trade.instrument_id
689 );
690 }
691 }
692
693 fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
694 let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
695 matching_core.iterate();
696
697 matching_core.get_orders()
698 } else {
699 log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
700 return;
701 };
702
703 for order in orders {
704 if order.is_closed() {
705 continue;
706 }
707
708 let mut order: OrderAny = order.clone().into();
709 if matches!(
710 order.order_type(),
711 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
712 ) {
713 self.update_trailing_stop_order(&mut order);
714 }
715 }
716 }
717
718 pub fn cancel_order(&mut self, order: &OrderAny) {
722 log::info!("Canceling order {}", order.client_order_id());
723
724 let mut order = order.clone();
725 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
726
727 let trigger_instrument_id = order
728 .trigger_instrument_id()
729 .unwrap_or(order.instrument_id());
730
731 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
732 && let Err(e) = matching_core.delete_order(
733 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
734 )
735 {
736 log::error!("Cannot delete order: {e:?}");
737 }
738
739 self.cache
740 .borrow_mut()
741 .update_order_pending_cancel_local(&order);
742
743 let ts_now = self.clock.borrow().timestamp_ns();
745 let event = OrderCanceled::new(
746 order.trader_id(),
747 order.strategy_id(),
748 order.instrument_id(),
749 order.client_order_id(),
750 UUID4::new(),
751 ts_now,
752 ts_now,
753 false,
754 order.venue_order_id(),
755 order.account_id(),
756 );
757
758 self.manager.send_exec_event(OrderEventAny::Canceled(event));
759 }
760
761 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
762 if !self.subscribed_strategies.contains(&strategy_id) {
763 if let Some(handler) = &self.on_event_handler {
765 msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
766 msgbus::subscribe_str(
767 format!("events.position.{strategy_id}"),
768 handler.clone(),
769 None,
770 );
771 self.subscribed_strategies.insert(strategy_id);
772 log::info!("Subscribed to strategy {strategy_id} order and position events");
773 }
774 }
775
776 if let Some(position_id) = position_id
777 && !self.monitored_positions.contains(&position_id)
778 {
779 self.monitored_positions.insert(position_id);
780 }
781 }
782
783 fn validate_release(
791 &self,
792 order: &OrderAny,
793 matching_core: &OrderMatchingCore,
794 trigger_instrument_id: InstrumentId,
795 ) -> Option<Price> {
796 let released_price = match order.order_side_specified() {
797 OrderSideSpecified::Buy => matching_core.ask,
798 OrderSideSpecified::Sell => matching_core.bid,
799 };
800
801 if released_price.is_none() {
802 log::warn!(
803 "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
804 order.client_order_id(),
805 );
806 return None;
807 }
808
809 Some(released_price.unwrap())
810 }
811
812 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
816 match order.order_type() {
817 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
818 self.fill_limit_order(order);
819 }
820 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
821 self.fill_market_order(order);
822 }
823 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
824 }
825 }
826
827 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
831 if matches!(order.order_type(), OrderType::Limit) {
832 self.fill_market_order(order);
833 return;
834 }
835
836 let trigger_instrument_id = order
837 .trigger_instrument_id()
838 .unwrap_or(order.instrument_id());
839
840 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
841 Some(core) => core,
842 None => {
843 log::error!(
844 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
845 );
846 return; }
848 };
849
850 let released_price =
851 match self.validate_release(order, matching_core, trigger_instrument_id) {
852 Some(price) => price,
853 None => return, };
855
856 let mut command = match self
857 .manager
858 .pop_submit_order_command(order.client_order_id())
859 {
860 Some(command) => command,
861 None => return, };
863
864 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
865 if let Err(e) = matching_core.delete_order(
866 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
867 ) {
868 log::error!("Error deleting order: {e:?}");
869 }
870
871 let emulation_trigger = TriggerType::NoTrigger;
872
873 let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
875 order.trader_id(),
876 order.strategy_id(),
877 order.instrument_id(),
878 order.client_order_id(),
879 order.order_side(),
880 order.quantity(),
881 order.price().unwrap(),
882 order.time_in_force(),
883 order.expire_time(),
884 order.is_post_only(),
885 order.is_reduce_only(),
886 order.is_quote_quantity(),
887 order.display_qty(),
888 Some(emulation_trigger),
889 Some(trigger_instrument_id),
890 order.contingency_type(),
891 order.order_list_id(),
892 order.linked_order_ids().map(Vec::from),
893 order.parent_order_id(),
894 order.exec_algorithm_id(),
895 order.exec_algorithm_params().cloned(),
896 order.exec_spawn_id(),
897 order.tags().map(Vec::from),
898 UUID4::new(),
899 self.clock.borrow().timestamp_ns(),
900 ) {
901 transformed
902 } else {
903 log::error!("Cannot create limit order");
904 return;
905 };
906 transformed.liquidity_side = order.liquidity_side();
907
908 let original_events = order.events();
915
916 for event in original_events {
917 transformed.events.insert(0, event.clone());
918 }
919
920 if let Err(e) = self.cache.borrow_mut().add_order(
921 OrderAny::Limit(transformed.clone()),
922 command.position_id,
923 command.client_id,
924 true,
925 ) {
926 log::error!("Failed to add order: {e}");
927 }
928
929 command.order = OrderAny::Limit(transformed.clone());
931
932 msgbus::publish(
933 format!("events.order.{}", order.strategy_id()).into(),
934 transformed.last_event(),
935 );
936
937 let event = OrderReleased::new(
939 order.trader_id(),
940 order.strategy_id(),
941 order.instrument_id(),
942 order.client_order_id(),
943 released_price,
944 UUID4::new(),
945 self.clock.borrow().timestamp_ns(),
946 self.clock.borrow().timestamp_ns(),
947 );
948
949 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
950 log::error!("Failed to apply order event: {e}");
951 }
952
953 if let Err(e) = self
954 .cache
955 .borrow_mut()
956 .update_order(&OrderAny::Limit(transformed.clone()))
957 {
958 log::error!("Failed to update order: {e}");
959 }
960
961 self.manager.send_risk_event(OrderEventAny::Released(event));
962
963 log::info!("Releasing order {}", order.client_order_id());
964
965 msgbus::publish(
967 format!("events.order.{}", transformed.strategy_id()).into(),
968 &OrderEventAny::Released(event),
969 );
970
971 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
972 self.manager.send_algo_command(command, exec_algorithm_id);
973 } else {
974 self.manager
975 .send_exec_command(TradingCommand::SubmitOrder(command));
976 }
977 }
978 }
979
980 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
984 let trigger_instrument_id = order
985 .trigger_instrument_id()
986 .unwrap_or(order.instrument_id());
987
988 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
989 Some(core) => core,
990 None => {
991 log::error!(
992 "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
993 );
994 return; }
996 };
997
998 let released_price =
999 match self.validate_release(order, matching_core, trigger_instrument_id) {
1000 Some(price) => price,
1001 None => return, };
1003
1004 let mut command = self
1005 .manager
1006 .pop_submit_order_command(order.client_order_id())
1007 .expect("invalid operation `fill_market_order` with no command");
1008
1009 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1010 if let Err(e) = matching_core.delete_order(
1011 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
1012 ) {
1013 log::error!("Cannot delete order: {e:?}");
1014 }
1015
1016 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1017
1018 let mut transformed = MarketOrder::new(
1020 order.trader_id(),
1021 order.strategy_id(),
1022 order.instrument_id(),
1023 order.client_order_id(),
1024 order.order_side(),
1025 order.quantity(),
1026 order.time_in_force(),
1027 UUID4::new(),
1028 self.clock.borrow().timestamp_ns(),
1029 order.is_reduce_only(),
1030 order.is_quote_quantity(),
1031 order.contingency_type(),
1032 order.order_list_id(),
1033 order.linked_order_ids().map(Vec::from),
1034 order.parent_order_id(),
1035 order.exec_algorithm_id(),
1036 order.exec_algorithm_params().cloned(),
1037 order.exec_spawn_id(),
1038 order.tags().map(Vec::from),
1039 );
1040
1041 let original_events = order.events();
1042
1043 for event in original_events {
1044 transformed.events.insert(0, event.clone());
1045 }
1046
1047 if let Err(e) = self.cache.borrow_mut().add_order(
1048 OrderAny::Market(transformed.clone()),
1049 command.position_id,
1050 command.client_id,
1051 true,
1052 ) {
1053 log::error!("Failed to add order: {e}");
1054 }
1055
1056 command.order = OrderAny::Market(transformed.clone());
1058
1059 msgbus::publish(
1060 format!("events.order.{}", order.strategy_id()).into(),
1061 transformed.last_event(),
1062 );
1063
1064 let ts_now = self.clock.borrow().timestamp_ns();
1066 let event = OrderReleased::new(
1067 order.trader_id(),
1068 order.strategy_id(),
1069 order.instrument_id(),
1070 order.client_order_id(),
1071 released_price,
1072 UUID4::new(),
1073 ts_now,
1074 ts_now,
1075 );
1076
1077 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1078 log::error!("Failed to apply order event: {e}");
1079 }
1080
1081 if let Err(e) = self
1082 .cache
1083 .borrow_mut()
1084 .update_order(&OrderAny::Market(transformed))
1085 {
1086 log::error!("Failed to update order: {e}");
1087 }
1088 self.manager.send_risk_event(OrderEventAny::Released(event));
1089
1090 log::info!("Releasing order {}", order.client_order_id());
1091
1092 msgbus::publish(
1094 format!("events.order.{}", order.strategy_id()).into(),
1095 &OrderEventAny::Released(event),
1096 );
1097
1098 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1099 self.manager.send_algo_command(command, exec_algorithm_id);
1100 } else {
1101 self.manager
1102 .send_exec_command(TradingCommand::SubmitOrder(command));
1103 }
1104 }
1105 }
1106
1107 #[allow(clippy::too_many_lines)]
1108 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1109 let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1110 log::error!(
1111 "Cannot update trailing-stop order: no matching core for instrument {}",
1112 order.instrument_id()
1113 );
1114 return;
1115 };
1116
1117 let mut bid = matching_core.bid;
1118 let mut ask = matching_core.ask;
1119 let mut last = matching_core.last;
1120
1121 if bid.is_none() || ask.is_none() || last.is_none() {
1122 if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1123 bid.get_or_insert(q.bid_price);
1124 ask.get_or_insert(q.ask_price);
1125 }
1126 if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1127 last.get_or_insert(t.price);
1128 }
1129 }
1130
1131 let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1132 matching_core.price_increment,
1133 order.trigger_price(),
1134 order.activation_price(),
1135 order,
1136 bid,
1137 ask,
1138 last,
1139 ) {
1140 Ok(pair) => pair,
1141 Err(e) => {
1142 log::warn!("Cannot calculate trailing-stop update: {e}");
1143 return;
1144 }
1145 };
1146
1147 if new_trigger_px.is_none() && new_limit_px.is_none() {
1148 return;
1149 }
1150
1151 let ts_now = self.clock.borrow().timestamp_ns();
1152 let update = OrderUpdated::new(
1153 order.trader_id(),
1154 order.strategy_id(),
1155 order.instrument_id(),
1156 order.client_order_id(),
1157 order.quantity(),
1158 UUID4::new(),
1159 ts_now,
1160 ts_now,
1161 false,
1162 order.venue_order_id(),
1163 order.account_id(),
1164 new_limit_px,
1165 new_trigger_px,
1166 None,
1167 );
1168 let wrapped = OrderEventAny::Updated(update);
1169 if let Err(e) = order.apply(wrapped.clone()) {
1170 log::error!("Failed to apply order event: {e}");
1171 return;
1172 }
1173 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1174 log::error!("Failed to update order in cache: {e}");
1175 return;
1176 }
1177 self.manager.send_risk_event(wrapped);
1178 }
1179}