1use std::{
17 cell::RefCell,
18 collections::{HashMap, HashSet},
19 fmt::Debug,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 clock::Clock,
26 logging::{CMD, EVT, RECV},
27 messages::execution::{
28 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
29 },
30 msgbus::{self, handler::ShareableMessageHandler},
31};
32use nautilus_core::UUID4;
33use nautilus_model::{
34 data::{OrderBookDeltas, QuoteTick, TradeTick},
35 enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
36 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
37 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
38 instruments::Instrument,
39 orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
40 types::{Price, Quantity},
41};
42
43use crate::{
44 matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
45 trailing::trailing_stop_calculate,
46};
47
48pub struct OrderEmulator {
49 clock: Rc<RefCell<dyn Clock>>,
50 cache: Rc<RefCell<Cache>>,
51 manager: OrderManager,
52 matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
53 subscribed_quotes: HashSet<InstrumentId>,
54 subscribed_trades: HashSet<InstrumentId>,
55 subscribed_strategies: HashSet<StrategyId>,
56 monitored_positions: HashSet<PositionId>,
57 on_event_handler: Option<ShareableMessageHandler>,
58}
59
60impl Debug for OrderEmulator {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct(stringify!(OrderEmulator))
63 .field("cores", &self.matching_cores.len())
64 .field("subscribed_quotes", &self.subscribed_quotes.len())
65 .finish()
66 }
67}
68
69impl OrderEmulator {
70 pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
71 let active_local = true;
75 let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
76
77 Self {
78 clock,
79 cache,
80 manager,
81 matching_cores: HashMap::new(),
82 subscribed_quotes: HashSet::new(),
83 subscribed_trades: HashSet::new(),
84 subscribed_strategies: HashSet::new(),
85 monitored_positions: HashSet::new(),
86 on_event_handler: None,
87 }
88 }
89
90 pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
91 self.on_event_handler = Some(handler);
92 }
93
94 #[must_use]
108 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
109 let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
110 quotes.sort();
111 quotes
112 }
113
114 #[must_use]
115 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
116 let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
117 trades.sort();
118 trades
119 }
120
121 #[must_use]
122 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
123 self.manager.get_submit_order_commands()
124 }
125
126 #[must_use]
127 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
128 self.matching_cores.get(instrument_id).cloned()
129 }
130
131 pub fn on_start(&mut self) -> anyhow::Result<()> {
141 let emulated_orders: Vec<OrderAny> = self
142 .cache
143 .borrow()
144 .orders_emulated(None, None, None, None)
145 .into_iter()
146 .cloned()
147 .collect();
148
149 if emulated_orders.is_empty() {
150 log::error!("No emulated orders to reactivate");
151 return Ok(());
152 }
153
154 for order in emulated_orders {
155 if !matches!(
156 order.status(),
157 OrderStatus::Initialized | OrderStatus::Emulated
158 ) {
159 continue; }
161
162 if let Some(parent_order_id) = &order.parent_order_id() {
163 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
164 order.clone()
165 } else {
166 log::error!("Cannot handle order: parent {parent_order_id} not found");
167 continue;
168 };
169
170 let is_position_closed = parent_order
171 .position_id()
172 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
173 if parent_order.is_closed() && is_position_closed {
174 self.manager.cancel_order(&order);
175 continue; }
177
178 if parent_order.contingency_type() == Some(ContingencyType::Oto)
179 && (parent_order.is_active_local()
180 || parent_order.filled_qty() == Quantity::zero(0))
181 {
182 continue; }
184 }
185
186 let position_id = self
187 .cache
188 .borrow()
189 .position_id(&order.client_order_id())
190 .copied();
191 let client_id = self
192 .cache
193 .borrow()
194 .client_id(&order.client_order_id())
195 .copied();
196
197 let command = SubmitOrder::new(
198 order.trader_id(),
199 client_id.unwrap(),
200 order.strategy_id(),
201 order.instrument_id(),
202 order.client_order_id(),
203 order.venue_order_id().unwrap(),
204 order.clone(),
205 order.exec_algorithm_id(),
206 position_id,
207 UUID4::new(),
208 self.clock.borrow().timestamp_ns(),
209 )?;
210
211 self.handle_submit_order(command);
212 }
213
214 Ok(())
215 }
216
217 pub fn on_event(&mut self, event: OrderEventAny) {
221 log::info!("{RECV}{EVT} {event}");
222
223 self.manager.handle_event(event.clone());
224
225 if let Some(order) = self.cache.borrow().order(&event.client_order_id())
226 && order.is_closed()
227 && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
228 && let Err(e) = matching_core.delete_order(
229 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
230 )
231 {
232 log::error!("Error deleting order: {e}");
233 }
234 }
236
237 pub const fn on_stop(&self) {}
238
239 pub fn on_reset(&mut self) {
240 self.manager.reset();
241 self.matching_cores.clear();
242 }
243
244 pub const fn on_dispose(&self) {}
245
246 pub fn execute(&mut self, command: TradingCommand) {
247 log::info!("{RECV}{CMD} {command}");
248
249 match command {
250 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
251 TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
252 TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
253 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
254 TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
255 _ => log::error!("Cannot handle command: unrecognized {command:?}"),
256 }
257 }
258
259 fn create_matching_core(
260 &mut self,
261 instrument_id: InstrumentId,
262 price_increment: Price,
263 ) -> OrderMatchingCore {
264 let matching_core =
265 OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
266 self.matching_cores
267 .insert(instrument_id, matching_core.clone());
268 log::info!("Creating matching core for {instrument_id:?}");
269 matching_core
270 }
271
272 pub fn handle_submit_order(&mut self, command: SubmitOrder) {
276 let mut order = command.order.clone();
277 let emulation_trigger = order.emulation_trigger();
278
279 assert_ne!(
280 emulation_trigger,
281 Some(TriggerType::NoTrigger),
282 "command.order.emulation_trigger must not be TriggerType::NoTrigger"
283 );
284 assert!(
285 self.manager
286 .get_submit_order_commands()
287 .contains_key(&order.client_order_id()),
288 "command.order.client_order_id must be in submit_order_commands"
289 );
290
291 if !matches!(
292 emulation_trigger,
293 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
294 ) {
295 log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
296 self.manager.cancel_order(&order);
297 return;
298 }
299
300 self.check_monitoring(command.strategy_id, command.position_id);
301
302 let trigger_instrument_id = order
304 .trigger_instrument_id()
305 .unwrap_or_else(|| order.instrument_id());
306
307 let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
308
309 let mut matching_core = if let Some(core) = matching_core {
310 core
311 } else {
312 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
314 let synthetic = self
315 .cache
316 .borrow()
317 .synthetic(&trigger_instrument_id)
318 .cloned();
319 if let Some(synthetic) = synthetic {
320 (synthetic.id, synthetic.price_increment)
321 } else {
322 log::error!(
323 "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
324 );
325 self.manager.cancel_order(&order);
326 return;
327 }
328 } else {
329 let instrument = self
330 .cache
331 .borrow()
332 .instrument(&trigger_instrument_id)
333 .cloned();
334 if let Some(instrument) = instrument {
335 (instrument.id(), instrument.price_increment())
336 } else {
337 log::error!(
338 "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
339 );
340 self.manager.cancel_order(&order);
341 return;
342 }
343 };
344
345 self.create_matching_core(instrument_id, price_increment)
346 };
347
348 if matches!(
350 order.order_type(),
351 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
352 ) {
353 self.update_trailing_stop_order(&mut order);
354 if order.trigger_price().is_none() {
355 log::error!(
356 "Cannot handle trailing stop order with no trigger_price and no market updates"
357 );
358
359 self.manager.cancel_order(&order);
360 return;
361 }
362 }
363
364 self.manager.cache_submit_order_command(command);
366
367 matching_core.match_order(
369 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
370 true,
371 );
372
373 match emulation_trigger.unwrap() {
375 TriggerType::Default | TriggerType::BidAsk => {
376 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
377 if !trigger_instrument_id.is_synthetic() {
378 }
381 self.subscribed_quotes.insert(trigger_instrument_id);
384 }
385 }
386 TriggerType::LastPrice => {
387 if !self.subscribed_trades.contains(&trigger_instrument_id) {
388 self.subscribed_trades.insert(trigger_instrument_id);
391 }
392 }
393 _ => {
394 log::error!("Invalid TriggerType: {emulation_trigger:?}");
395 return;
396 }
397 }
398
399 if !self
401 .manager
402 .get_submit_order_commands()
403 .contains_key(&order.client_order_id())
404 {
405 return; }
407
408 if let Err(e) = matching_core
410 .add_order(PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"))
411 {
412 log::error!("Cannot add order: {e:?}");
413 return;
414 }
415
416 if order.status() == OrderStatus::Initialized {
418 let event = OrderEmulated::new(
419 order.trader_id(),
420 order.strategy_id(),
421 order.instrument_id(),
422 order.client_order_id(),
423 UUID4::new(),
424 self.clock.borrow().timestamp_ns(),
425 self.clock.borrow().timestamp_ns(),
426 );
427
428 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
429 log::error!("Cannot apply order event: {e:?}");
430 return;
431 }
432
433 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
434 log::error!("Cannot update order: {e:?}");
435 return;
436 }
437
438 self.manager.send_risk_event(OrderEventAny::Emulated(event));
439
440 msgbus::publish(
441 format!("events.order.{}", order.strategy_id()).into(),
442 &OrderEventAny::Emulated(event),
443 );
444 }
445
446 self.matching_cores
448 .insert(trigger_instrument_id, matching_core);
449
450 log::info!("Emulating {order}");
451 }
452
453 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
454 self.check_monitoring(command.strategy_id, command.position_id);
455
456 for order in &command.order_list.orders {
457 if let Some(parent_order_id) = order.parent_order_id() {
458 let cache = self.cache.borrow();
459 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
460 parent_order
461 } else {
462 log::error!("Parent order for {} not found", order.client_order_id());
463 continue;
464 };
465
466 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
467 continue; }
469 }
470
471 if let Err(e) = self.manager.create_new_submit_order(
472 order,
473 command.position_id,
474 Some(command.client_id),
475 ) {
476 log::error!("Error creating new submit order: {e}");
477 }
478 }
479 }
480
481 fn handle_modify_order(&mut self, command: ModifyOrder) {
482 if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
483 let price = match command.price {
484 Some(price) => Some(price),
485 None => order.price(),
486 };
487
488 let trigger_price = match command.trigger_price {
489 Some(trigger_price) => Some(trigger_price),
490 None => order.trigger_price(),
491 };
492
493 let ts_now = self.clock.borrow().timestamp_ns();
495 let event = OrderUpdated::new(
496 order.trader_id(),
497 order.strategy_id(),
498 order.instrument_id(),
499 order.client_order_id(),
500 command.quantity.unwrap_or(order.quantity()),
501 UUID4::new(),
502 ts_now,
503 ts_now,
504 false,
505 order.venue_order_id(),
506 order.account_id(),
507 price,
508 trigger_price,
509 );
510
511 self.manager.send_exec_event(OrderEventAny::Updated(event));
512
513 let trigger_instrument_id = order
514 .trigger_instrument_id()
515 .unwrap_or_else(|| order.instrument_id());
516
517 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
518 matching_core.match_order(
519 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
520 false,
521 );
522 } else {
523 log::error!(
524 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
525 );
526 }
527 } else {
528 log::error!("Cannot modify order: {} not found", command.client_order_id);
529 }
530 }
531
532 pub fn handle_cancel_order(&mut self, command: CancelOrder) {
533 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
534 order.clone()
535 } else {
536 log::error!("Cannot cancel order: {} not found", command.client_order_id);
537 return;
538 };
539
540 let trigger_instrument_id = order
541 .trigger_instrument_id()
542 .unwrap_or_else(|| order.instrument_id());
543
544 let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
545 core
546 } else {
547 self.manager.cancel_order(&order);
548 return;
549 };
550
551 if !matching_core.order_exists(order.client_order_id())
552 && order.is_open()
553 && !order.is_pending_cancel()
554 {
555 self.manager
557 .send_exec_command(TradingCommand::CancelOrder(command));
558 } else {
559 self.manager.cancel_order(&order);
560 }
561 }
562
563 fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
564 let instrument_id = command.instrument_id;
565 let matching_core = match self.matching_cores.get(&instrument_id) {
566 Some(core) => core,
567 None => return, };
569
570 let orders_to_cancel = match command.order_side {
571 OrderSide::NoOrderSide => {
572 let mut all_orders = Vec::new();
574 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
575 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
576 all_orders
577 }
578 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
579 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
580 };
581
582 for order in orders_to_cancel {
584 self.manager.cancel_order(&OrderAny::from(order));
585 }
586 }
587
588 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
591 log::info!(
592 "Updating order {} quantity to {new_quantity}",
593 order.client_order_id(),
594 );
595
596 let ts_now = self.clock.borrow().timestamp_ns();
598 let event = OrderUpdated::new(
599 order.trader_id(),
600 order.strategy_id(),
601 order.instrument_id(),
602 order.client_order_id(),
603 new_quantity,
604 UUID4::new(),
605 ts_now,
606 ts_now,
607 false,
608 None,
609 order.account_id(),
610 None,
611 None,
612 );
613
614 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
615 log::error!("Cannot apply order event: {e:?}");
616 return;
617 }
618 if let Err(e) = self.cache.borrow_mut().update_order(order) {
619 log::error!("Cannot update order: {e:?}");
620 return;
621 }
622
623 self.manager.send_risk_event(OrderEventAny::Updated(event));
624 }
625
626 pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
629 log::debug!("Processing {deltas:?}");
630
631 let instrument_id = &deltas.instrument_id;
632 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
633 if let Some(book) = self.cache.borrow().order_book(instrument_id) {
634 let best_bid = book.best_bid_price();
635 let best_ask = book.best_ask_price();
636
637 if let Some(best_bid) = best_bid {
638 matching_core.set_bid_raw(best_bid);
639 }
640
641 if let Some(best_ask) = best_ask {
642 matching_core.set_ask_raw(best_ask);
643 }
644 } else {
645 log::error!(
646 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
647 deltas.instrument_id
648 );
649 }
650
651 self.iterate_orders(instrument_id);
652 } else {
653 log::error!(
654 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
655 deltas.instrument_id
656 );
657 }
658 }
659
660 pub fn on_quote_tick(&mut self, quote: QuoteTick) {
661 log::debug!("Processing {quote}:?");
662
663 let instrument_id = "e.instrument_id;
664 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
665 matching_core.set_bid_raw(quote.bid_price);
666 matching_core.set_ask_raw(quote.ask_price);
667
668 self.iterate_orders(instrument_id);
669 } else {
670 log::error!(
671 "Cannot handle `QuoteTick`: no matching core for instrument {}",
672 quote.instrument_id
673 );
674 }
675 }
676
677 pub fn on_trade_tick(&mut self, trade: TradeTick) {
678 log::debug!("Processing {trade:?}");
679
680 let instrument_id = &trade.instrument_id;
681 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
682 matching_core.set_last_raw(trade.price);
683 if !self.subscribed_quotes.contains(instrument_id) {
684 matching_core.set_bid_raw(trade.price);
685 matching_core.set_ask_raw(trade.price);
686 }
687
688 self.iterate_orders(instrument_id);
689 } else {
690 log::error!(
691 "Cannot handle `TradeTick`: no matching core for instrument {}",
692 trade.instrument_id
693 );
694 }
695 }
696
697 fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
698 let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
699 matching_core.iterate();
700
701 matching_core.get_orders()
702 } else {
703 log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
704 return;
705 };
706
707 for order in orders {
708 if order.is_closed() {
709 continue;
710 }
711
712 let mut order: OrderAny = order.clone().into();
713 if matches!(
714 order.order_type(),
715 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
716 ) {
717 self.update_trailing_stop_order(&mut order);
718 }
719 }
720 }
721
722 pub fn cancel_order(&mut self, order: &OrderAny) {
726 log::info!("Canceling order {}", order.client_order_id());
727
728 let mut order = order.clone();
729 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
730
731 let trigger_instrument_id = order
732 .trigger_instrument_id()
733 .unwrap_or(order.instrument_id());
734
735 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
736 && let Err(e) = matching_core.delete_order(
737 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
738 )
739 {
740 log::error!("Cannot delete order: {e:?}");
741 }
742
743 self.cache
744 .borrow_mut()
745 .update_order_pending_cancel_local(&order);
746
747 let ts_now = self.clock.borrow().timestamp_ns();
749 let event = OrderCanceled::new(
750 order.trader_id(),
751 order.strategy_id(),
752 order.instrument_id(),
753 order.client_order_id(),
754 UUID4::new(),
755 ts_now,
756 ts_now,
757 false,
758 order.venue_order_id(),
759 order.account_id(),
760 );
761
762 self.manager.send_exec_event(OrderEventAny::Canceled(event));
763 }
764
765 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
766 if !self.subscribed_strategies.contains(&strategy_id) {
767 if let Some(handler) = &self.on_event_handler {
769 msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
770 msgbus::subscribe_str(
771 format!("events.position.{strategy_id}"),
772 handler.clone(),
773 None,
774 );
775 self.subscribed_strategies.insert(strategy_id);
776 log::info!("Subscribed to strategy {strategy_id} order and position events");
777 }
778 }
779
780 if let Some(position_id) = position_id
781 && !self.monitored_positions.contains(&position_id)
782 {
783 self.monitored_positions.insert(position_id);
784 }
785 }
786
787 fn validate_release(
795 &self,
796 order: &OrderAny,
797 matching_core: &OrderMatchingCore,
798 trigger_instrument_id: InstrumentId,
799 ) -> Option<Price> {
800 let released_price = match order.order_side_specified() {
801 OrderSideSpecified::Buy => matching_core.ask,
802 OrderSideSpecified::Sell => matching_core.bid,
803 };
804
805 if released_price.is_none() {
806 log::warn!(
807 "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
808 order.client_order_id(),
809 );
810 return None;
811 }
812
813 Some(released_price.unwrap())
814 }
815
816 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
820 match order.order_type() {
821 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
822 self.fill_limit_order(order);
823 }
824 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
825 self.fill_market_order(order);
826 }
827 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
828 }
829 }
830
831 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
835 if matches!(order.order_type(), OrderType::Limit) {
836 self.fill_market_order(order);
837 return;
838 }
839
840 let trigger_instrument_id = order
841 .trigger_instrument_id()
842 .unwrap_or(order.instrument_id());
843
844 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
845 Some(core) => core,
846 None => {
847 log::error!(
848 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
849 );
850 return; }
852 };
853
854 let released_price =
855 match self.validate_release(order, matching_core, trigger_instrument_id) {
856 Some(price) => price,
857 None => return, };
859
860 let mut command = match self
861 .manager
862 .pop_submit_order_command(order.client_order_id())
863 {
864 Some(command) => command,
865 None => return, };
867
868 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
869 if let Err(e) = matching_core.delete_order(
870 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
871 ) {
872 log::error!("Error deleting order: {e:?}");
873 }
874
875 let emulation_trigger = TriggerType::NoTrigger;
876
877 let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
879 order.trader_id(),
880 order.strategy_id(),
881 order.instrument_id(),
882 order.client_order_id(),
883 order.order_side(),
884 order.quantity(),
885 order.price().unwrap(),
886 order.time_in_force(),
887 order.expire_time(),
888 order.is_post_only(),
889 order.is_reduce_only(),
890 order.is_quote_quantity(),
891 order.display_qty(),
892 Some(emulation_trigger),
893 Some(trigger_instrument_id),
894 order.contingency_type(),
895 order.order_list_id(),
896 order.linked_order_ids().map(Vec::from),
897 order.parent_order_id(),
898 order.exec_algorithm_id(),
899 order.exec_algorithm_params().cloned(),
900 order.exec_spawn_id(),
901 order.tags().map(Vec::from),
902 UUID4::new(),
903 self.clock.borrow().timestamp_ns(),
904 ) {
905 transformed
906 } else {
907 log::error!("Cannot create limit order");
908 return;
909 };
910 transformed.liquidity_side = order.liquidity_side();
911
912 let original_events = order.events();
919
920 for event in original_events {
921 transformed.events.insert(0, event.clone());
922 }
923
924 if let Err(e) = self.cache.borrow_mut().add_order(
925 OrderAny::Limit(transformed.clone()),
926 command.position_id,
927 Some(command.client_id),
928 true,
929 ) {
930 log::error!("Failed to add order: {e}");
931 }
932
933 command.order = OrderAny::Limit(transformed.clone());
935
936 msgbus::publish(
937 format!("events.order.{}", order.strategy_id()).into(),
938 transformed.last_event(),
939 );
940
941 let event = OrderReleased::new(
943 order.trader_id(),
944 order.strategy_id(),
945 order.instrument_id(),
946 order.client_order_id(),
947 released_price,
948 UUID4::new(),
949 self.clock.borrow().timestamp_ns(),
950 self.clock.borrow().timestamp_ns(),
951 );
952
953 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
954 log::error!("Failed to apply order event: {e}");
955 }
956
957 if let Err(e) = self
958 .cache
959 .borrow_mut()
960 .update_order(&OrderAny::Limit(transformed.clone()))
961 {
962 log::error!("Failed to update order: {e}");
963 }
964
965 self.manager.send_risk_event(OrderEventAny::Released(event));
966
967 log::info!("Releasing order {}", order.client_order_id());
968
969 msgbus::publish(
971 format!("events.order.{}", transformed.strategy_id()).into(),
972 &OrderEventAny::Released(event),
973 );
974
975 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
976 self.manager.send_algo_command(command, exec_algorithm_id);
977 } else {
978 self.manager
979 .send_exec_command(TradingCommand::SubmitOrder(command));
980 }
981 }
982 }
983
984 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
988 let trigger_instrument_id = order
989 .trigger_instrument_id()
990 .unwrap_or(order.instrument_id());
991
992 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
993 Some(core) => core,
994 None => {
995 log::error!(
996 "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
997 );
998 return; }
1000 };
1001
1002 let released_price =
1003 match self.validate_release(order, matching_core, trigger_instrument_id) {
1004 Some(price) => price,
1005 None => return, };
1007
1008 let mut command = self
1009 .manager
1010 .pop_submit_order_command(order.client_order_id())
1011 .expect("invalid operation `fill_market_order` with no command");
1012
1013 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1014 if let Err(e) = matching_core.delete_order(
1015 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
1016 ) {
1017 log::error!("Cannot delete order: {e:?}");
1018 }
1019
1020 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1021
1022 let mut transformed = MarketOrder::new(
1024 order.trader_id(),
1025 order.strategy_id(),
1026 order.instrument_id(),
1027 order.client_order_id(),
1028 order.order_side(),
1029 order.quantity(),
1030 order.time_in_force(),
1031 UUID4::new(),
1032 self.clock.borrow().timestamp_ns(),
1033 order.is_reduce_only(),
1034 order.is_quote_quantity(),
1035 order.contingency_type(),
1036 order.order_list_id(),
1037 order.linked_order_ids().map(Vec::from),
1038 order.parent_order_id(),
1039 order.exec_algorithm_id(),
1040 order.exec_algorithm_params().cloned(),
1041 order.exec_spawn_id(),
1042 order.tags().map(Vec::from),
1043 );
1044
1045 let original_events = order.events();
1046
1047 for event in original_events {
1048 transformed.events.insert(0, event.clone());
1049 }
1050
1051 if let Err(e) = self.cache.borrow_mut().add_order(
1052 OrderAny::Market(transformed.clone()),
1053 command.position_id,
1054 Some(command.client_id),
1055 true,
1056 ) {
1057 log::error!("Failed to add order: {e}");
1058 }
1059
1060 command.order = OrderAny::Market(transformed.clone());
1062
1063 msgbus::publish(
1064 format!("events.order.{}", order.strategy_id()).into(),
1065 transformed.last_event(),
1066 );
1067
1068 let ts_now = self.clock.borrow().timestamp_ns();
1070 let event = OrderReleased::new(
1071 order.trader_id(),
1072 order.strategy_id(),
1073 order.instrument_id(),
1074 order.client_order_id(),
1075 released_price,
1076 UUID4::new(),
1077 ts_now,
1078 ts_now,
1079 );
1080
1081 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1082 log::error!("Failed to apply order event: {e}");
1083 }
1084
1085 if let Err(e) = self
1086 .cache
1087 .borrow_mut()
1088 .update_order(&OrderAny::Market(transformed))
1089 {
1090 log::error!("Failed to update order: {e}");
1091 }
1092 self.manager.send_risk_event(OrderEventAny::Released(event));
1093
1094 log::info!("Releasing order {}", order.client_order_id());
1095
1096 msgbus::publish(
1098 format!("events.order.{}", order.strategy_id()).into(),
1099 &OrderEventAny::Released(event),
1100 );
1101
1102 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1103 self.manager.send_algo_command(command, exec_algorithm_id);
1104 } else {
1105 self.manager
1106 .send_exec_command(TradingCommand::SubmitOrder(command));
1107 }
1108 }
1109 }
1110
1111 #[allow(clippy::too_many_lines)]
1112 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1113 let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1114 log::error!(
1115 "Cannot update trailing-stop order: no matching core for instrument {}",
1116 order.instrument_id()
1117 );
1118 return;
1119 };
1120
1121 let mut bid = matching_core.bid;
1122 let mut ask = matching_core.ask;
1123 let mut last = matching_core.last;
1124
1125 if bid.is_none() || ask.is_none() || last.is_none() {
1126 if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1127 bid.get_or_insert(q.bid_price);
1128 ask.get_or_insert(q.ask_price);
1129 }
1130 if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1131 last.get_or_insert(t.price);
1132 }
1133 }
1134
1135 let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1136 matching_core.price_increment,
1137 order.trigger_price(),
1138 order.activation_price(),
1139 order,
1140 bid,
1141 ask,
1142 last,
1143 ) {
1144 Ok(pair) => pair,
1145 Err(e) => {
1146 log::warn!("Cannot calculate trailing-stop update: {e}");
1147 return;
1148 }
1149 };
1150
1151 if new_trigger_px.is_none() && new_limit_px.is_none() {
1152 return;
1153 }
1154
1155 let ts_now = self.clock.borrow().timestamp_ns();
1156 let update = OrderUpdated::new(
1157 order.trader_id(),
1158 order.strategy_id(),
1159 order.instrument_id(),
1160 order.client_order_id(),
1161 order.quantity(),
1162 UUID4::new(),
1163 ts_now,
1164 ts_now,
1165 false,
1166 order.venue_order_id(),
1167 order.account_id(),
1168 new_limit_px,
1169 new_trigger_px,
1170 );
1171 let wrapped = OrderEventAny::Updated(update);
1172 if let Err(e) = order.apply(wrapped.clone()) {
1173 log::error!("Failed to apply order event: {e}");
1174 return;
1175 }
1176 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1177 log::error!("Failed to update order in cache: {e}");
1178 return;
1179 }
1180 self.manager.send_risk_event(wrapped);
1181 }
1182}