1use std::{
17 cell::RefCell,
18 collections::{HashMap, HashSet},
19 fmt::Debug,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 clock::Clock,
26 logging::{CMD, EVT, RECV},
27 messages::execution::{
28 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
29 },
30 msgbus::{self, handler::ShareableMessageHandler},
31};
32use nautilus_core::UUID4;
33use nautilus_model::{
34 data::{OrderBookDeltas, QuoteTick, TradeTick},
35 enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
36 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
37 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
38 instruments::Instrument,
39 orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
40 types::{Price, Quantity},
41};
42
43use crate::{
44 matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
45 trailing::trailing_stop_calculate,
46};
47
48pub struct OrderEmulator {
49 clock: Rc<RefCell<dyn Clock>>,
50 cache: Rc<RefCell<Cache>>,
51 manager: OrderManager,
52 matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
53 subscribed_quotes: HashSet<InstrumentId>,
54 subscribed_trades: HashSet<InstrumentId>,
55 subscribed_strategies: HashSet<StrategyId>,
56 monitored_positions: HashSet<PositionId>,
57 on_event_handler: Option<ShareableMessageHandler>,
58}
59
60impl Debug for OrderEmulator {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct(stringify!(OrderEmulator))
63 .field("cores", &self.matching_cores.len())
64 .field("subscribed_quotes", &self.subscribed_quotes.len())
65 .finish()
66 }
67}
68
69impl OrderEmulator {
70 pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
71 let active_local = true;
75 let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
76
77 Self {
78 clock,
79 cache,
80 manager,
81 matching_cores: HashMap::new(),
82 subscribed_quotes: HashSet::new(),
83 subscribed_trades: HashSet::new(),
84 subscribed_strategies: HashSet::new(),
85 monitored_positions: HashSet::new(),
86 on_event_handler: None,
87 }
88 }
89
90 pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
91 self.on_event_handler = Some(handler);
92 }
93
94 #[must_use]
108 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
109 let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
110 quotes.sort();
111 quotes
112 }
113
114 #[must_use]
115 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
116 let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
117 trades.sort();
118 trades
119 }
120
121 #[must_use]
122 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
123 self.manager.get_submit_order_commands()
124 }
125
126 #[must_use]
127 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
128 self.matching_cores.get(instrument_id).cloned()
129 }
130
131 pub fn on_start(&mut self) -> anyhow::Result<()> {
141 let emulated_orders: Vec<OrderAny> = self
142 .cache
143 .borrow()
144 .orders_emulated(None, None, None, None)
145 .into_iter()
146 .cloned()
147 .collect();
148
149 if emulated_orders.is_empty() {
150 log::error!("No emulated orders to reactivate");
151 return Ok(());
152 }
153
154 for order in emulated_orders {
155 if !matches!(
156 order.status(),
157 OrderStatus::Initialized | OrderStatus::Emulated
158 ) {
159 continue; }
161
162 if let Some(parent_order_id) = &order.parent_order_id() {
163 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
164 order.clone()
165 } else {
166 log::error!("Cannot handle order: parent {parent_order_id} not found");
167 continue;
168 };
169
170 let is_position_closed = parent_order
171 .position_id()
172 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
173 if parent_order.is_closed() && is_position_closed {
174 self.manager.cancel_order(&order);
175 continue; }
177
178 if parent_order.contingency_type() == Some(ContingencyType::Oto)
179 && (parent_order.is_active_local()
180 || parent_order.filled_qty() == Quantity::zero(0))
181 {
182 continue; }
184 }
185
186 let position_id = self
187 .cache
188 .borrow()
189 .position_id(&order.client_order_id())
190 .copied();
191 let client_id = self
192 .cache
193 .borrow()
194 .client_id(&order.client_order_id())
195 .copied();
196
197 let command = SubmitOrder::new(
198 order.trader_id(),
199 client_id.unwrap(),
200 order.strategy_id(),
201 order.instrument_id(),
202 order.client_order_id(),
203 order.venue_order_id().unwrap(),
204 order.clone(),
205 order.exec_algorithm_id(),
206 position_id,
207 None, UUID4::new(),
209 self.clock.borrow().timestamp_ns(),
210 )?;
211
212 self.handle_submit_order(command);
213 }
214
215 Ok(())
216 }
217
218 pub fn on_event(&mut self, event: OrderEventAny) {
222 log::info!("{RECV}{EVT} {event}");
223
224 self.manager.handle_event(event.clone());
225
226 if let Some(order) = self.cache.borrow().order(&event.client_order_id())
227 && order.is_closed()
228 && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
229 && let Err(e) = matching_core.delete_order(
230 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
231 )
232 {
233 log::error!("Error deleting order: {e}");
234 }
235 }
237
238 pub const fn on_stop(&self) {}
239
240 pub fn on_reset(&mut self) {
241 self.manager.reset();
242 self.matching_cores.clear();
243 }
244
245 pub const fn on_dispose(&self) {}
246
247 pub fn execute(&mut self, command: TradingCommand) {
248 log::info!("{RECV}{CMD} {command}");
249
250 match command {
251 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
252 TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
253 TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
254 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
255 TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
256 _ => log::error!("Cannot handle command: unrecognized {command:?}"),
257 }
258 }
259
260 fn create_matching_core(
261 &mut self,
262 instrument_id: InstrumentId,
263 price_increment: Price,
264 ) -> OrderMatchingCore {
265 let matching_core =
266 OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
267 self.matching_cores
268 .insert(instrument_id, matching_core.clone());
269 log::info!("Creating matching core for {instrument_id:?}");
270 matching_core
271 }
272
273 pub fn handle_submit_order(&mut self, command: SubmitOrder) {
277 let mut order = command.order.clone();
278 let emulation_trigger = order.emulation_trigger();
279
280 assert_ne!(
281 emulation_trigger,
282 Some(TriggerType::NoTrigger),
283 "command.order.emulation_trigger must not be TriggerType::NoTrigger"
284 );
285 assert!(
286 self.manager
287 .get_submit_order_commands()
288 .contains_key(&order.client_order_id()),
289 "command.order.client_order_id must be in submit_order_commands"
290 );
291
292 if !matches!(
293 emulation_trigger,
294 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
295 ) {
296 log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
297 self.manager.cancel_order(&order);
298 return;
299 }
300
301 self.check_monitoring(command.strategy_id, command.position_id);
302
303 let trigger_instrument_id = order
305 .trigger_instrument_id()
306 .unwrap_or_else(|| order.instrument_id());
307
308 let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
309
310 let mut matching_core = if let Some(core) = matching_core {
311 core
312 } else {
313 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
315 let synthetic = self
316 .cache
317 .borrow()
318 .synthetic(&trigger_instrument_id)
319 .cloned();
320 if let Some(synthetic) = synthetic {
321 (synthetic.id, synthetic.price_increment)
322 } else {
323 log::error!(
324 "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
325 );
326 self.manager.cancel_order(&order);
327 return;
328 }
329 } else {
330 let instrument = self
331 .cache
332 .borrow()
333 .instrument(&trigger_instrument_id)
334 .cloned();
335 if let Some(instrument) = instrument {
336 (instrument.id(), instrument.price_increment())
337 } else {
338 log::error!(
339 "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
340 );
341 self.manager.cancel_order(&order);
342 return;
343 }
344 };
345
346 self.create_matching_core(instrument_id, price_increment)
347 };
348
349 if matches!(
351 order.order_type(),
352 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
353 ) {
354 self.update_trailing_stop_order(&mut order);
355 if order.trigger_price().is_none() {
356 log::error!(
357 "Cannot handle trailing stop order with no trigger_price and no market updates"
358 );
359
360 self.manager.cancel_order(&order);
361 return;
362 }
363 }
364
365 self.manager.cache_submit_order_command(command);
367
368 matching_core.match_order(
370 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
371 true,
372 );
373
374 match emulation_trigger.unwrap() {
376 TriggerType::Default | TriggerType::BidAsk => {
377 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
378 if !trigger_instrument_id.is_synthetic() {
379 }
382 self.subscribed_quotes.insert(trigger_instrument_id);
385 }
386 }
387 TriggerType::LastPrice => {
388 if !self.subscribed_trades.contains(&trigger_instrument_id) {
389 self.subscribed_trades.insert(trigger_instrument_id);
392 }
393 }
394 _ => {
395 log::error!("Invalid TriggerType: {emulation_trigger:?}");
396 return;
397 }
398 }
399
400 if !self
402 .manager
403 .get_submit_order_commands()
404 .contains_key(&order.client_order_id())
405 {
406 return; }
408
409 if let Err(e) = matching_core
411 .add_order(PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"))
412 {
413 log::error!("Cannot add order: {e:?}");
414 return;
415 }
416
417 if order.status() == OrderStatus::Initialized {
419 let event = OrderEmulated::new(
420 order.trader_id(),
421 order.strategy_id(),
422 order.instrument_id(),
423 order.client_order_id(),
424 UUID4::new(),
425 self.clock.borrow().timestamp_ns(),
426 self.clock.borrow().timestamp_ns(),
427 );
428
429 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
430 log::error!("Cannot apply order event: {e:?}");
431 return;
432 }
433
434 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
435 log::error!("Cannot update order: {e:?}");
436 return;
437 }
438
439 self.manager.send_risk_event(OrderEventAny::Emulated(event));
440
441 msgbus::publish(
442 format!("events.order.{}", order.strategy_id()).into(),
443 &OrderEventAny::Emulated(event),
444 );
445 }
446
447 self.matching_cores
449 .insert(trigger_instrument_id, matching_core);
450
451 log::info!("Emulating {order}");
452 }
453
454 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
455 self.check_monitoring(command.strategy_id, command.position_id);
456
457 for order in &command.order_list.orders {
458 if let Some(parent_order_id) = order.parent_order_id() {
459 let cache = self.cache.borrow();
460 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
461 parent_order
462 } else {
463 log::error!("Parent order for {} not found", order.client_order_id());
464 continue;
465 };
466
467 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
468 continue; }
470 }
471
472 if let Err(e) = self.manager.create_new_submit_order(
473 order,
474 command.position_id,
475 Some(command.client_id),
476 ) {
477 log::error!("Error creating new submit order: {e}");
478 }
479 }
480 }
481
482 fn handle_modify_order(&mut self, command: ModifyOrder) {
483 if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
484 let price = match command.price {
485 Some(price) => Some(price),
486 None => order.price(),
487 };
488
489 let trigger_price = match command.trigger_price {
490 Some(trigger_price) => Some(trigger_price),
491 None => order.trigger_price(),
492 };
493
494 let ts_now = self.clock.borrow().timestamp_ns();
496 let event = OrderUpdated::new(
497 order.trader_id(),
498 order.strategy_id(),
499 order.instrument_id(),
500 order.client_order_id(),
501 command.quantity.unwrap_or(order.quantity()),
502 UUID4::new(),
503 ts_now,
504 ts_now,
505 false,
506 order.venue_order_id(),
507 order.account_id(),
508 price,
509 trigger_price,
510 None,
511 );
512
513 self.manager.send_exec_event(OrderEventAny::Updated(event));
514
515 let trigger_instrument_id = order
516 .trigger_instrument_id()
517 .unwrap_or_else(|| order.instrument_id());
518
519 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
520 matching_core.match_order(
521 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
522 false,
523 );
524 } else {
525 log::error!(
526 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
527 );
528 }
529 } else {
530 log::error!("Cannot modify order: {} not found", command.client_order_id);
531 }
532 }
533
534 pub fn handle_cancel_order(&mut self, command: CancelOrder) {
535 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
536 order.clone()
537 } else {
538 log::error!("Cannot cancel order: {} not found", command.client_order_id);
539 return;
540 };
541
542 let trigger_instrument_id = order
543 .trigger_instrument_id()
544 .unwrap_or_else(|| order.instrument_id());
545
546 let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
547 core
548 } else {
549 self.manager.cancel_order(&order);
550 return;
551 };
552
553 if !matching_core.order_exists(order.client_order_id())
554 && order.is_open()
555 && !order.is_pending_cancel()
556 {
557 self.manager
559 .send_exec_command(TradingCommand::CancelOrder(command));
560 } else {
561 self.manager.cancel_order(&order);
562 }
563 }
564
565 fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
566 let instrument_id = command.instrument_id;
567 let matching_core = match self.matching_cores.get(&instrument_id) {
568 Some(core) => core,
569 None => return, };
571
572 let orders_to_cancel = match command.order_side {
573 OrderSide::NoOrderSide => {
574 let mut all_orders = Vec::new();
576 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
577 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
578 all_orders
579 }
580 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
581 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
582 };
583
584 for order in orders_to_cancel {
586 self.manager.cancel_order(&OrderAny::from(order));
587 }
588 }
589
590 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
593 log::info!(
594 "Updating order {} quantity to {new_quantity}",
595 order.client_order_id(),
596 );
597
598 let ts_now = self.clock.borrow().timestamp_ns();
600 let event = OrderUpdated::new(
601 order.trader_id(),
602 order.strategy_id(),
603 order.instrument_id(),
604 order.client_order_id(),
605 new_quantity,
606 UUID4::new(),
607 ts_now,
608 ts_now,
609 false,
610 None,
611 order.account_id(),
612 None,
613 None,
614 None,
615 );
616
617 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
618 log::error!("Cannot apply order event: {e:?}");
619 return;
620 }
621 if let Err(e) = self.cache.borrow_mut().update_order(order) {
622 log::error!("Cannot update order: {e:?}");
623 return;
624 }
625
626 self.manager.send_risk_event(OrderEventAny::Updated(event));
627 }
628
629 pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
632 log::debug!("Processing {deltas:?}");
633
634 let instrument_id = &deltas.instrument_id;
635 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
636 if let Some(book) = self.cache.borrow().order_book(instrument_id) {
637 let best_bid = book.best_bid_price();
638 let best_ask = book.best_ask_price();
639
640 if let Some(best_bid) = best_bid {
641 matching_core.set_bid_raw(best_bid);
642 }
643
644 if let Some(best_ask) = best_ask {
645 matching_core.set_ask_raw(best_ask);
646 }
647 } else {
648 log::error!(
649 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
650 deltas.instrument_id
651 );
652 }
653
654 self.iterate_orders(instrument_id);
655 } else {
656 log::error!(
657 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
658 deltas.instrument_id
659 );
660 }
661 }
662
663 pub fn on_quote_tick(&mut self, quote: QuoteTick) {
664 log::debug!("Processing {quote}:?");
665
666 let instrument_id = "e.instrument_id;
667 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
668 matching_core.set_bid_raw(quote.bid_price);
669 matching_core.set_ask_raw(quote.ask_price);
670
671 self.iterate_orders(instrument_id);
672 } else {
673 log::error!(
674 "Cannot handle `QuoteTick`: no matching core for instrument {}",
675 quote.instrument_id
676 );
677 }
678 }
679
680 pub fn on_trade_tick(&mut self, trade: TradeTick) {
681 log::debug!("Processing {trade:?}");
682
683 let instrument_id = &trade.instrument_id;
684 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
685 matching_core.set_last_raw(trade.price);
686 if !self.subscribed_quotes.contains(instrument_id) {
687 matching_core.set_bid_raw(trade.price);
688 matching_core.set_ask_raw(trade.price);
689 }
690
691 self.iterate_orders(instrument_id);
692 } else {
693 log::error!(
694 "Cannot handle `TradeTick`: no matching core for instrument {}",
695 trade.instrument_id
696 );
697 }
698 }
699
700 fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
701 let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
702 matching_core.iterate();
703
704 matching_core.get_orders()
705 } else {
706 log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
707 return;
708 };
709
710 for order in orders {
711 if order.is_closed() {
712 continue;
713 }
714
715 let mut order: OrderAny = order.clone().into();
716 if matches!(
717 order.order_type(),
718 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
719 ) {
720 self.update_trailing_stop_order(&mut order);
721 }
722 }
723 }
724
725 pub fn cancel_order(&mut self, order: &OrderAny) {
729 log::info!("Canceling order {}", order.client_order_id());
730
731 let mut order = order.clone();
732 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
733
734 let trigger_instrument_id = order
735 .trigger_instrument_id()
736 .unwrap_or(order.instrument_id());
737
738 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
739 && let Err(e) = matching_core.delete_order(
740 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
741 )
742 {
743 log::error!("Cannot delete order: {e:?}");
744 }
745
746 self.cache
747 .borrow_mut()
748 .update_order_pending_cancel_local(&order);
749
750 let ts_now = self.clock.borrow().timestamp_ns();
752 let event = OrderCanceled::new(
753 order.trader_id(),
754 order.strategy_id(),
755 order.instrument_id(),
756 order.client_order_id(),
757 UUID4::new(),
758 ts_now,
759 ts_now,
760 false,
761 order.venue_order_id(),
762 order.account_id(),
763 );
764
765 self.manager.send_exec_event(OrderEventAny::Canceled(event));
766 }
767
768 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
769 if !self.subscribed_strategies.contains(&strategy_id) {
770 if let Some(handler) = &self.on_event_handler {
772 msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
773 msgbus::subscribe_str(
774 format!("events.position.{strategy_id}"),
775 handler.clone(),
776 None,
777 );
778 self.subscribed_strategies.insert(strategy_id);
779 log::info!("Subscribed to strategy {strategy_id} order and position events");
780 }
781 }
782
783 if let Some(position_id) = position_id
784 && !self.monitored_positions.contains(&position_id)
785 {
786 self.monitored_positions.insert(position_id);
787 }
788 }
789
790 fn validate_release(
798 &self,
799 order: &OrderAny,
800 matching_core: &OrderMatchingCore,
801 trigger_instrument_id: InstrumentId,
802 ) -> Option<Price> {
803 let released_price = match order.order_side_specified() {
804 OrderSideSpecified::Buy => matching_core.ask,
805 OrderSideSpecified::Sell => matching_core.bid,
806 };
807
808 if released_price.is_none() {
809 log::warn!(
810 "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
811 order.client_order_id(),
812 );
813 return None;
814 }
815
816 Some(released_price.unwrap())
817 }
818
819 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
823 match order.order_type() {
824 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
825 self.fill_limit_order(order);
826 }
827 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
828 self.fill_market_order(order);
829 }
830 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
831 }
832 }
833
834 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
838 if matches!(order.order_type(), OrderType::Limit) {
839 self.fill_market_order(order);
840 return;
841 }
842
843 let trigger_instrument_id = order
844 .trigger_instrument_id()
845 .unwrap_or(order.instrument_id());
846
847 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
848 Some(core) => core,
849 None => {
850 log::error!(
851 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
852 );
853 return; }
855 };
856
857 let released_price =
858 match self.validate_release(order, matching_core, trigger_instrument_id) {
859 Some(price) => price,
860 None => return, };
862
863 let mut command = match self
864 .manager
865 .pop_submit_order_command(order.client_order_id())
866 {
867 Some(command) => command,
868 None => return, };
870
871 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
872 if let Err(e) = matching_core.delete_order(
873 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
874 ) {
875 log::error!("Error deleting order: {e:?}");
876 }
877
878 let emulation_trigger = TriggerType::NoTrigger;
879
880 let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
882 order.trader_id(),
883 order.strategy_id(),
884 order.instrument_id(),
885 order.client_order_id(),
886 order.order_side(),
887 order.quantity(),
888 order.price().unwrap(),
889 order.time_in_force(),
890 order.expire_time(),
891 order.is_post_only(),
892 order.is_reduce_only(),
893 order.is_quote_quantity(),
894 order.display_qty(),
895 Some(emulation_trigger),
896 Some(trigger_instrument_id),
897 order.contingency_type(),
898 order.order_list_id(),
899 order.linked_order_ids().map(Vec::from),
900 order.parent_order_id(),
901 order.exec_algorithm_id(),
902 order.exec_algorithm_params().cloned(),
903 order.exec_spawn_id(),
904 order.tags().map(Vec::from),
905 UUID4::new(),
906 self.clock.borrow().timestamp_ns(),
907 ) {
908 transformed
909 } else {
910 log::error!("Cannot create limit order");
911 return;
912 };
913 transformed.liquidity_side = order.liquidity_side();
914
915 let original_events = order.events();
922
923 for event in original_events {
924 transformed.events.insert(0, event.clone());
925 }
926
927 if let Err(e) = self.cache.borrow_mut().add_order(
928 OrderAny::Limit(transformed.clone()),
929 command.position_id,
930 Some(command.client_id),
931 true,
932 ) {
933 log::error!("Failed to add order: {e}");
934 }
935
936 command.order = OrderAny::Limit(transformed.clone());
938
939 msgbus::publish(
940 format!("events.order.{}", order.strategy_id()).into(),
941 transformed.last_event(),
942 );
943
944 let event = OrderReleased::new(
946 order.trader_id(),
947 order.strategy_id(),
948 order.instrument_id(),
949 order.client_order_id(),
950 released_price,
951 UUID4::new(),
952 self.clock.borrow().timestamp_ns(),
953 self.clock.borrow().timestamp_ns(),
954 );
955
956 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
957 log::error!("Failed to apply order event: {e}");
958 }
959
960 if let Err(e) = self
961 .cache
962 .borrow_mut()
963 .update_order(&OrderAny::Limit(transformed.clone()))
964 {
965 log::error!("Failed to update order: {e}");
966 }
967
968 self.manager.send_risk_event(OrderEventAny::Released(event));
969
970 log::info!("Releasing order {}", order.client_order_id());
971
972 msgbus::publish(
974 format!("events.order.{}", transformed.strategy_id()).into(),
975 &OrderEventAny::Released(event),
976 );
977
978 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
979 self.manager.send_algo_command(command, exec_algorithm_id);
980 } else {
981 self.manager
982 .send_exec_command(TradingCommand::SubmitOrder(command));
983 }
984 }
985 }
986
987 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
991 let trigger_instrument_id = order
992 .trigger_instrument_id()
993 .unwrap_or(order.instrument_id());
994
995 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
996 Some(core) => core,
997 None => {
998 log::error!(
999 "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
1000 );
1001 return; }
1003 };
1004
1005 let released_price =
1006 match self.validate_release(order, matching_core, trigger_instrument_id) {
1007 Some(price) => price,
1008 None => return, };
1010
1011 let mut command = self
1012 .manager
1013 .pop_submit_order_command(order.client_order_id())
1014 .expect("invalid operation `fill_market_order` with no command");
1015
1016 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1017 if let Err(e) = matching_core.delete_order(
1018 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
1019 ) {
1020 log::error!("Cannot delete order: {e:?}");
1021 }
1022
1023 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1024
1025 let mut transformed = MarketOrder::new(
1027 order.trader_id(),
1028 order.strategy_id(),
1029 order.instrument_id(),
1030 order.client_order_id(),
1031 order.order_side(),
1032 order.quantity(),
1033 order.time_in_force(),
1034 UUID4::new(),
1035 self.clock.borrow().timestamp_ns(),
1036 order.is_reduce_only(),
1037 order.is_quote_quantity(),
1038 order.contingency_type(),
1039 order.order_list_id(),
1040 order.linked_order_ids().map(Vec::from),
1041 order.parent_order_id(),
1042 order.exec_algorithm_id(),
1043 order.exec_algorithm_params().cloned(),
1044 order.exec_spawn_id(),
1045 order.tags().map(Vec::from),
1046 );
1047
1048 let original_events = order.events();
1049
1050 for event in original_events {
1051 transformed.events.insert(0, event.clone());
1052 }
1053
1054 if let Err(e) = self.cache.borrow_mut().add_order(
1055 OrderAny::Market(transformed.clone()),
1056 command.position_id,
1057 Some(command.client_id),
1058 true,
1059 ) {
1060 log::error!("Failed to add order: {e}");
1061 }
1062
1063 command.order = OrderAny::Market(transformed.clone());
1065
1066 msgbus::publish(
1067 format!("events.order.{}", order.strategy_id()).into(),
1068 transformed.last_event(),
1069 );
1070
1071 let ts_now = self.clock.borrow().timestamp_ns();
1073 let event = OrderReleased::new(
1074 order.trader_id(),
1075 order.strategy_id(),
1076 order.instrument_id(),
1077 order.client_order_id(),
1078 released_price,
1079 UUID4::new(),
1080 ts_now,
1081 ts_now,
1082 );
1083
1084 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1085 log::error!("Failed to apply order event: {e}");
1086 }
1087
1088 if let Err(e) = self
1089 .cache
1090 .borrow_mut()
1091 .update_order(&OrderAny::Market(transformed))
1092 {
1093 log::error!("Failed to update order: {e}");
1094 }
1095 self.manager.send_risk_event(OrderEventAny::Released(event));
1096
1097 log::info!("Releasing order {}", order.client_order_id());
1098
1099 msgbus::publish(
1101 format!("events.order.{}", order.strategy_id()).into(),
1102 &OrderEventAny::Released(event),
1103 );
1104
1105 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1106 self.manager.send_algo_command(command, exec_algorithm_id);
1107 } else {
1108 self.manager
1109 .send_exec_command(TradingCommand::SubmitOrder(command));
1110 }
1111 }
1112 }
1113
1114 #[allow(clippy::too_many_lines)]
1115 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1116 let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1117 log::error!(
1118 "Cannot update trailing-stop order: no matching core for instrument {}",
1119 order.instrument_id()
1120 );
1121 return;
1122 };
1123
1124 let mut bid = matching_core.bid;
1125 let mut ask = matching_core.ask;
1126 let mut last = matching_core.last;
1127
1128 if bid.is_none() || ask.is_none() || last.is_none() {
1129 if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1130 bid.get_or_insert(q.bid_price);
1131 ask.get_or_insert(q.ask_price);
1132 }
1133 if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1134 last.get_or_insert(t.price);
1135 }
1136 }
1137
1138 let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1139 matching_core.price_increment,
1140 order.trigger_price(),
1141 order.activation_price(),
1142 order,
1143 bid,
1144 ask,
1145 last,
1146 ) {
1147 Ok(pair) => pair,
1148 Err(e) => {
1149 log::warn!("Cannot calculate trailing-stop update: {e}");
1150 return;
1151 }
1152 };
1153
1154 if new_trigger_px.is_none() && new_limit_px.is_none() {
1155 return;
1156 }
1157
1158 let ts_now = self.clock.borrow().timestamp_ns();
1159 let update = OrderUpdated::new(
1160 order.trader_id(),
1161 order.strategy_id(),
1162 order.instrument_id(),
1163 order.client_order_id(),
1164 order.quantity(),
1165 UUID4::new(),
1166 ts_now,
1167 ts_now,
1168 false,
1169 order.venue_order_id(),
1170 order.account_id(),
1171 new_limit_px,
1172 new_trigger_px,
1173 None,
1174 );
1175 let wrapped = OrderEventAny::Updated(update);
1176 if let Err(e) = order.apply(wrapped.clone()) {
1177 log::error!("Failed to apply order event: {e}");
1178 return;
1179 }
1180 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1181 log::error!("Failed to update order in cache: {e}");
1182 return;
1183 }
1184 self.manager.send_risk_event(wrapped);
1185 }
1186}