1use std::{
17 cell::RefCell,
18 fmt::Debug,
19 ops::{Deref, DerefMut},
20 rc::Rc,
21};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25 actor::{DataActorConfig, DataActorCore},
26 cache::Cache,
27 clock::Clock,
28 logging::{CMD, EVT, RECV},
29 messages::execution::{
30 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
31 },
32 msgbus::{
33 self, TypedHandler,
34 switchboard::{get_quotes_topic, get_trades_topic},
35 },
36};
37use nautilus_core::{UUID4, WeakCell};
38use nautilus_model::{
39 data::{OrderBookDeltas, QuoteTick, TradeTick},
40 enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
41 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
42 identifiers::{ActorId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId},
43 instruments::Instrument,
44 orders::{LimitOrder, MarketOrder, Order, OrderAny},
45 types::{Price, Quantity},
46};
47
48use crate::{
49 matching_core::{OrderMatchInfo, OrderMatchingCore},
50 order_manager::{
51 handlers::{CancelOrderHandlerAny, ModifyOrderHandlerAny, SubmitOrderHandlerAny},
52 manager::OrderManager,
53 },
54 trailing::trailing_stop_calculate,
55};
56
57pub struct OrderEmulator {
58 actor: DataActorCore,
59 clock: Rc<RefCell<dyn Clock>>,
60 cache: Rc<RefCell<Cache>>,
61 manager: OrderManager,
62 matching_cores: AHashMap<InstrumentId, OrderMatchingCore>,
63 subscribed_quotes: AHashSet<InstrumentId>,
64 subscribed_trades: AHashSet<InstrumentId>,
65 subscribed_strategies: AHashSet<StrategyId>,
66 monitored_positions: AHashSet<PositionId>,
67 on_event_handler: Option<TypedHandler<OrderEventAny>>,
68 self_ref: Option<WeakCell<Self>>,
69}
70
71impl Debug for OrderEmulator {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct(stringify!(OrderEmulator))
74 .field("actor", &self.actor)
75 .field("cores", &self.matching_cores.len())
76 .field("subscribed_quotes", &self.subscribed_quotes.len())
77 .finish()
78 }
79}
80
81impl Deref for OrderEmulator {
82 type Target = DataActorCore;
83
84 fn deref(&self) -> &Self::Target {
85 &self.actor
86 }
87}
88
89impl DerefMut for OrderEmulator {
90 fn deref_mut(&mut self) -> &mut Self::Target {
91 &mut self.actor
92 }
93}
94
95impl OrderEmulator {
96 pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
97 let config = DataActorConfig {
98 actor_id: Some(ActorId::from("OrderEmulator")),
99 ..Default::default()
100 };
101
102 let active_local = true;
103 let manager =
104 OrderManager::new(clock.clone(), cache.clone(), active_local, None, None, None);
105
106 Self {
107 actor: DataActorCore::new(config),
108 clock,
109 cache,
110 manager,
111 matching_cores: AHashMap::new(),
112 subscribed_quotes: AHashSet::new(),
113 subscribed_trades: AHashSet::new(),
114 subscribed_strategies: AHashSet::new(),
115 monitored_positions: AHashSet::new(),
116 on_event_handler: None,
117 self_ref: None,
118 }
119 }
120
121 pub fn set_self_ref(&mut self, self_ref: WeakCell<Self>) {
123 self.self_ref = Some(self_ref);
124 }
125
126 pub fn register(
132 &mut self,
133 trader_id: TraderId,
134 clock: Rc<RefCell<dyn Clock>>,
135 cache: Rc<RefCell<Cache>>,
136 ) -> anyhow::Result<()> {
137 self.actor.register(trader_id, clock, cache)
138 }
139
140 pub fn set_on_event_handler(&mut self, handler: TypedHandler<OrderEventAny>) {
141 self.on_event_handler = Some(handler);
142 }
143
144 pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
146 self.manager.set_submit_order_handler(handler);
147 }
148
149 pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
151 self.manager.set_cancel_order_handler(handler);
152 }
153
154 pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
156 self.manager.set_modify_order_handler(handler);
157 }
158
159 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
161 self.manager.cache_submit_order_command(command);
162 }
163
164 fn subscribe_quotes_for_instrument(&mut self, instrument_id: InstrumentId) {
166 let Some(self_ref) = self.self_ref.clone() else {
167 log::warn!("Cannot subscribe to quotes: self_ref not set");
168 return;
169 };
170
171 let topic = get_quotes_topic(instrument_id);
172 let handler = TypedHandler::from(move |quote: &QuoteTick| {
173 if let Some(emulator) = self_ref.upgrade() {
174 emulator.borrow_mut().on_quote_tick(*quote);
175 }
176 });
177
178 self.actor
179 .subscribe_quotes(topic, handler, instrument_id, None, None);
180 }
181
182 fn subscribe_trades_for_instrument(&mut self, instrument_id: InstrumentId) {
184 let Some(self_ref) = self.self_ref.clone() else {
185 log::warn!("Cannot subscribe to trades: self_ref not set");
186 return;
187 };
188
189 let topic = get_trades_topic(instrument_id);
190 let handler = TypedHandler::from(move |trade: &TradeTick| {
191 if let Some(emulator) = self_ref.upgrade() {
192 emulator.borrow_mut().on_trade_tick(*trade);
193 }
194 });
195
196 self.actor
197 .subscribe_trades(topic, handler, instrument_id, None, None);
198 }
199
200 #[must_use]
201 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
202 let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
203 quotes.sort();
204 quotes
205 }
206
207 #[must_use]
208 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
209 let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
210 trades.sort();
211 trades
212 }
213
214 #[must_use]
215 pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
216 self.manager.get_submit_order_commands()
217 }
218
219 #[must_use]
220 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
221 self.matching_cores.get(instrument_id).cloned()
222 }
223
224 pub fn on_start(&mut self) -> anyhow::Result<()> {
234 let emulated_orders: Vec<OrderAny> = self
235 .cache
236 .borrow()
237 .orders_emulated(None, None, None, None, None)
238 .into_iter()
239 .cloned()
240 .collect();
241
242 if emulated_orders.is_empty() {
243 log::error!("No emulated orders to reactivate");
244 return Ok(());
245 }
246
247 for order in emulated_orders {
248 if !matches!(
249 order.status(),
250 OrderStatus::Initialized | OrderStatus::Emulated
251 ) {
252 continue; }
254
255 if let Some(parent_order_id) = &order.parent_order_id() {
256 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
257 order.clone()
258 } else {
259 log::error!("Cannot handle order: parent {parent_order_id} not found");
260 continue;
261 };
262
263 let is_position_closed = parent_order
264 .position_id()
265 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
266 if parent_order.is_closed() && is_position_closed {
267 self.manager.cancel_order(&order);
268 continue; }
270
271 if parent_order.contingency_type() == Some(ContingencyType::Oto)
272 && (parent_order.is_active_local()
273 || parent_order.filled_qty() == Quantity::zero(0))
274 {
275 continue; }
277 }
278
279 let position_id = self
280 .cache
281 .borrow()
282 .position_id(&order.client_order_id())
283 .copied();
284 let client_id = self
285 .cache
286 .borrow()
287 .client_id(&order.client_order_id())
288 .copied();
289
290 let command = SubmitOrder::new(
291 order.trader_id(),
292 client_id,
293 order.strategy_id(),
294 order.instrument_id(),
295 order.client_order_id(),
296 order.init_event().clone(),
297 order.exec_algorithm_id(),
298 position_id,
299 None, UUID4::new(),
301 self.clock.borrow().timestamp_ns(),
302 );
303
304 self.handle_submit_order(command);
305 }
306
307 Ok(())
308 }
309
310 pub fn on_event(&mut self, event: OrderEventAny) {
314 log::info!("{RECV}{EVT} {event}");
315
316 self.manager.handle_event(event.clone());
317
318 if let Some(order) = self.cache.borrow().order(&event.client_order_id())
319 && order.is_closed()
320 && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
321 && let Err(e) = matching_core.delete_order(event.client_order_id())
322 {
323 log::error!("Error deleting order: {e}");
324 }
325 }
327
328 pub const fn on_stop(&self) {}
329
330 pub fn on_reset(&mut self) {
331 self.manager.reset();
332 self.matching_cores.clear();
333 }
334
335 pub const fn on_dispose(&self) {}
336
337 pub fn execute(&mut self, command: TradingCommand) {
338 log::info!("{RECV}{CMD} {command}");
339
340 match command {
341 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
342 TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
343 TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
344 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
345 TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
346 _ => log::error!("Cannot handle command: unrecognized {command:?}"),
347 }
348 }
349
350 fn create_matching_core(
351 &mut self,
352 instrument_id: InstrumentId,
353 price_increment: Price,
354 ) -> OrderMatchingCore {
355 let matching_core =
356 OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
357 self.matching_cores
358 .insert(instrument_id, matching_core.clone());
359 log::info!("Creating matching core for {instrument_id:?}");
360 matching_core
361 }
362
363 pub fn handle_submit_order(&mut self, command: SubmitOrder) {
367 let client_order_id = command.client_order_id;
368
369 let mut order = self
370 .cache
371 .borrow()
372 .order(&client_order_id)
373 .cloned()
374 .expect("order must exist in cache");
375
376 let emulation_trigger = order.emulation_trigger();
377
378 assert_ne!(
379 emulation_trigger,
380 Some(TriggerType::NoTrigger),
381 "order.emulation_trigger must not be TriggerType::NoTrigger"
382 );
383 assert!(
384 self.manager
385 .get_submit_order_commands()
386 .contains_key(&client_order_id),
387 "client_order_id must be in submit_order_commands"
388 );
389
390 if !matches!(
391 emulation_trigger,
392 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
393 ) {
394 log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
395 self.manager.cancel_order(&order);
396 return;
397 }
398
399 self.check_monitoring(command.strategy_id, command.position_id);
400
401 let trigger_instrument_id = order
403 .trigger_instrument_id()
404 .unwrap_or_else(|| order.instrument_id());
405
406 let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
407
408 let mut matching_core = if let Some(core) = matching_core {
409 core
410 } else {
411 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
413 let synthetic = self
414 .cache
415 .borrow()
416 .synthetic(&trigger_instrument_id)
417 .cloned();
418 if let Some(synthetic) = synthetic {
419 (synthetic.id, synthetic.price_increment)
420 } else {
421 log::error!(
422 "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
423 );
424 self.manager.cancel_order(&order);
425 return;
426 }
427 } else {
428 let instrument = self
429 .cache
430 .borrow()
431 .instrument(&trigger_instrument_id)
432 .cloned();
433 if let Some(instrument) = instrument {
434 (instrument.id(), instrument.price_increment())
435 } else {
436 log::error!(
437 "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
438 );
439 self.manager.cancel_order(&order);
440 return;
441 }
442 };
443
444 self.create_matching_core(instrument_id, price_increment)
445 };
446
447 if matches!(
449 order.order_type(),
450 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
451 ) {
452 self.update_trailing_stop_order(&mut order);
453 if order.trigger_price().is_none() {
454 log::error!(
455 "Cannot handle trailing stop order with no trigger_price and no market updates"
456 );
457
458 self.manager.cancel_order(&order);
459 return;
460 }
461 }
462
463 self.manager.cache_submit_order_command(command);
465
466 let match_info = OrderMatchInfo::new(
468 order.client_order_id(),
469 order.order_side().as_specified(),
470 order.order_type(),
471 order.trigger_price(),
472 order.price(),
473 true, );
475 matching_core.match_order(&match_info);
476
477 match emulation_trigger.unwrap() {
479 TriggerType::Default | TriggerType::BidAsk => {
480 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
481 self.subscribe_quotes_for_instrument(trigger_instrument_id);
482 self.subscribed_quotes.insert(trigger_instrument_id);
483 }
484 }
485 TriggerType::LastPrice => {
486 if !self.subscribed_trades.contains(&trigger_instrument_id) {
487 self.subscribe_trades_for_instrument(trigger_instrument_id);
488 self.subscribed_trades.insert(trigger_instrument_id);
489 }
490 }
491 _ => {
492 log::error!("Invalid TriggerType: {emulation_trigger:?}");
493 return;
494 }
495 }
496
497 if !self
499 .manager
500 .get_submit_order_commands()
501 .contains_key(&order.client_order_id())
502 {
503 return; }
505
506 matching_core.add_order(match_info);
508
509 if order.status() == OrderStatus::Initialized {
511 let event = OrderEmulated::new(
512 order.trader_id(),
513 order.strategy_id(),
514 order.instrument_id(),
515 order.client_order_id(),
516 UUID4::new(),
517 self.clock.borrow().timestamp_ns(),
518 self.clock.borrow().timestamp_ns(),
519 );
520
521 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
522 log::error!("Cannot apply order event: {e:?}");
523 return;
524 }
525
526 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
527 log::error!("Cannot update order: {e:?}");
528 return;
529 }
530
531 self.manager.send_risk_event(OrderEventAny::Emulated(event));
532
533 msgbus::publish_order_event(
534 format!("events.order.{}", order.strategy_id()).into(),
535 &OrderEventAny::Emulated(event),
536 );
537 }
538
539 self.matching_cores
541 .insert(trigger_instrument_id, matching_core);
542
543 log::info!("Emulating {order}");
544 }
545
546 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
547 self.check_monitoring(command.strategy_id, command.position_id);
548
549 let orders: Vec<OrderAny> = self
550 .cache
551 .borrow()
552 .orders_for_ids(&command.order_list.client_order_ids, &command);
553
554 for order in &orders {
555 if let Some(parent_order_id) = order.parent_order_id() {
556 let cache = self.cache.borrow();
557 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
558 parent_order
559 } else {
560 log::error!("Parent order for {} not found", order.client_order_id());
561 continue;
562 };
563
564 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
565 continue; }
567 }
568
569 if let Err(e) =
570 self.manager
571 .create_new_submit_order(order, command.position_id, command.client_id)
572 {
573 log::error!("Error creating new submit order: {e}");
574 }
575 }
576 }
577
578 fn handle_modify_order(&mut self, command: ModifyOrder) {
579 if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
580 let price = match command.price {
581 Some(price) => Some(price),
582 None => order.price(),
583 };
584
585 let trigger_price = match command.trigger_price {
586 Some(trigger_price) => Some(trigger_price),
587 None => order.trigger_price(),
588 };
589
590 let ts_now = self.clock.borrow().timestamp_ns();
591 let event = OrderUpdated::new(
592 order.trader_id(),
593 order.strategy_id(),
594 order.instrument_id(),
595 order.client_order_id(),
596 command.quantity.unwrap_or(order.quantity()),
597 UUID4::new(),
598 ts_now,
599 ts_now,
600 false,
601 order.venue_order_id(),
602 order.account_id(),
603 price,
604 trigger_price,
605 None,
606 );
607
608 self.manager.send_exec_event(OrderEventAny::Updated(event));
609
610 let trigger_instrument_id = order
611 .trigger_instrument_id()
612 .unwrap_or_else(|| order.instrument_id());
613
614 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
615 let match_info = OrderMatchInfo::new(
616 order.client_order_id(),
617 order.order_side().as_specified(),
618 order.order_type(),
619 trigger_price,
620 price,
621 true, );
623 matching_core.match_order(&match_info);
624 } else {
625 log::error!(
626 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
627 );
628 }
629 } else {
630 log::error!("Cannot modify order: {} not found", command.client_order_id);
631 }
632 }
633
634 pub fn handle_cancel_order(&mut self, command: CancelOrder) {
635 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
636 order.clone()
637 } else {
638 log::error!("Cannot cancel order: {} not found", command.client_order_id);
639 return;
640 };
641
642 let trigger_instrument_id = order
643 .trigger_instrument_id()
644 .unwrap_or_else(|| order.instrument_id());
645
646 let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
647 core
648 } else {
649 self.manager.cancel_order(&order);
650 return;
651 };
652
653 if !matching_core.order_exists(order.client_order_id())
654 && order.is_open()
655 && !order.is_pending_cancel()
656 {
657 self.manager
659 .send_exec_command(TradingCommand::CancelOrder(command));
660 } else {
661 self.manager.cancel_order(&order);
662 }
663 }
664
665 fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
666 let instrument_id = command.instrument_id;
667 let matching_core = match self.matching_cores.get(&instrument_id) {
668 Some(core) => core,
669 None => return, };
671
672 let orders_to_cancel = match command.order_side {
673 OrderSide::NoOrderSide => {
674 let mut all_orders = Vec::new();
676 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
677 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
678 all_orders
679 }
680 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
681 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
682 };
683
684 for match_info in orders_to_cancel {
686 if let Some(order) = self
687 .cache
688 .borrow()
689 .order(&match_info.client_order_id)
690 .cloned()
691 {
692 self.manager.cancel_order(&order);
693 }
694 }
695 }
696
697 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
698 log::info!(
699 "Updating order {} quantity to {new_quantity}",
700 order.client_order_id(),
701 );
702
703 let ts_now = self.clock.borrow().timestamp_ns();
704 let event = OrderUpdated::new(
705 order.trader_id(),
706 order.strategy_id(),
707 order.instrument_id(),
708 order.client_order_id(),
709 new_quantity,
710 UUID4::new(),
711 ts_now,
712 ts_now,
713 false,
714 None,
715 order.account_id(),
716 None,
717 None,
718 None,
719 );
720
721 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
722 log::error!("Cannot apply order event: {e:?}");
723 return;
724 }
725 if let Err(e) = self.cache.borrow_mut().update_order(order) {
726 log::error!("Cannot update order: {e:?}");
727 return;
728 }
729
730 self.manager.send_risk_event(OrderEventAny::Updated(event));
731 }
732
733 pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
734 log::debug!("Processing {deltas:?}");
735
736 let instrument_id = &deltas.instrument_id;
737 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
738 if let Some(book) = self.cache.borrow().order_book(instrument_id) {
739 let best_bid = book.best_bid_price();
740 let best_ask = book.best_ask_price();
741
742 if let Some(best_bid) = best_bid {
743 matching_core.set_bid_raw(best_bid);
744 }
745
746 if let Some(best_ask) = best_ask {
747 matching_core.set_ask_raw(best_ask);
748 }
749 } else {
750 log::error!(
751 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
752 deltas.instrument_id
753 );
754 }
755
756 self.iterate_orders(instrument_id);
757 } else {
758 log::error!(
759 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
760 deltas.instrument_id
761 );
762 }
763 }
764
765 pub fn on_quote_tick(&mut self, quote: QuoteTick) {
766 log::debug!("Processing {quote}:?");
767
768 let instrument_id = "e.instrument_id;
769 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
770 matching_core.set_bid_raw(quote.bid_price);
771 matching_core.set_ask_raw(quote.ask_price);
772
773 self.iterate_orders(instrument_id);
774 } else {
775 log::error!(
776 "Cannot handle `QuoteTick`: no matching core for instrument {}",
777 quote.instrument_id
778 );
779 }
780 }
781
782 pub fn on_trade_tick(&mut self, trade: TradeTick) {
783 log::debug!("Processing {trade:?}");
784
785 let instrument_id = &trade.instrument_id;
786 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
787 matching_core.set_last_raw(trade.price);
788 if !self.subscribed_quotes.contains(instrument_id) {
789 matching_core.set_bid_raw(trade.price);
790 matching_core.set_ask_raw(trade.price);
791 }
792
793 self.iterate_orders(instrument_id);
794 } else {
795 log::error!(
796 "Cannot handle `TradeTick`: no matching core for instrument {}",
797 trade.instrument_id
798 );
799 }
800 }
801
802 fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
803 let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
804 matching_core.iterate();
805
806 matching_core.get_orders()
807 } else {
808 log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
809 return;
810 };
811
812 for match_info in orders {
813 if !matches!(
814 match_info.order_type,
815 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
816 ) {
817 continue;
818 }
819
820 let mut order = match self
821 .cache
822 .borrow()
823 .order(&match_info.client_order_id)
824 .cloned()
825 {
826 Some(order) => order,
827 None => continue,
828 };
829
830 if order.is_closed() {
831 continue;
832 }
833
834 self.update_trailing_stop_order(&mut order);
835 }
836 }
837
838 pub fn cancel_order(&mut self, order: &OrderAny) {
842 log::info!("Canceling order {}", order.client_order_id());
843
844 let mut order = order.clone();
845 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
846
847 let trigger_instrument_id = order
848 .trigger_instrument_id()
849 .unwrap_or(order.instrument_id());
850
851 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
852 && let Err(e) = matching_core.delete_order(order.client_order_id())
853 {
854 log::error!("Cannot delete order: {e:?}");
855 }
856
857 self.manager
858 .pop_submit_order_command(order.client_order_id());
859
860 self.cache
861 .borrow_mut()
862 .update_order_pending_cancel_local(&order);
863
864 let ts_now = self.clock.borrow().timestamp_ns();
865 let event = OrderCanceled::new(
866 order.trader_id(),
867 order.strategy_id(),
868 order.instrument_id(),
869 order.client_order_id(),
870 UUID4::new(),
871 ts_now,
872 ts_now,
873 false,
874 order.venue_order_id(),
875 order.account_id(),
876 );
877
878 self.manager.send_exec_event(OrderEventAny::Canceled(event));
879 }
880
881 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
882 if !self.subscribed_strategies.contains(&strategy_id) {
883 if let Some(handler) = &self.on_event_handler {
885 msgbus::subscribe_order_events(
886 format!("events.order.{strategy_id}").into(),
887 handler.clone(),
888 None,
889 );
890 self.subscribed_strategies.insert(strategy_id);
891 log::info!("Subscribed to strategy {strategy_id} order events");
892 }
893 }
894
895 if let Some(position_id) = position_id
896 && !self.monitored_positions.contains(&position_id)
897 {
898 self.monitored_positions.insert(position_id);
899 }
900 }
901
902 fn validate_release(
910 &self,
911 order: &OrderAny,
912 matching_core: &OrderMatchingCore,
913 trigger_instrument_id: InstrumentId,
914 ) -> Option<Price> {
915 let released_price = match order.order_side_specified() {
916 OrderSideSpecified::Buy => matching_core.ask,
917 OrderSideSpecified::Sell => matching_core.bid,
918 };
919
920 if released_price.is_none() {
921 log::warn!(
922 "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
923 order.client_order_id(),
924 );
925 return None;
926 }
927
928 Some(released_price.unwrap())
929 }
930
931 pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
935 let order = match self.cache.borrow().order(&client_order_id).cloned() {
936 Some(order) => order,
937 None => {
938 log::error!(
939 "Cannot trigger stop order: order {client_order_id} not found in cache"
940 );
941 return;
942 }
943 };
944
945 match order.order_type() {
946 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
947 self.fill_limit_order(client_order_id);
948 }
949 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
950 self.fill_market_order(client_order_id);
951 }
952 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
953 }
954 }
955
956 pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
960 let order = match self.cache.borrow().order(&client_order_id).cloned() {
961 Some(order) => order,
962 None => {
963 log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
964 return;
965 }
966 };
967
968 if matches!(order.order_type(), OrderType::Limit) {
969 self.fill_market_order(client_order_id);
970 return;
971 }
972
973 let trigger_instrument_id = order
974 .trigger_instrument_id()
975 .unwrap_or(order.instrument_id());
976
977 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
978 Some(core) => core,
979 None => {
980 log::error!(
981 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
982 );
983 return; }
985 };
986
987 let released_price =
988 match self.validate_release(&order, matching_core, trigger_instrument_id) {
989 Some(price) => price,
990 None => return, };
992
993 let command = match self
994 .manager
995 .pop_submit_order_command(order.client_order_id())
996 {
997 Some(command) => command,
998 None => return, };
1000
1001 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1002 if let Err(e) = matching_core.delete_order(client_order_id) {
1003 log::error!("Error deleting order: {e:?}");
1004 }
1005
1006 let emulation_trigger = TriggerType::NoTrigger;
1007
1008 let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
1010 order.trader_id(),
1011 order.strategy_id(),
1012 order.instrument_id(),
1013 order.client_order_id(),
1014 order.order_side(),
1015 order.quantity(),
1016 order.price().unwrap(),
1017 order.time_in_force(),
1018 order.expire_time(),
1019 order.is_post_only(),
1020 order.is_reduce_only(),
1021 order.is_quote_quantity(),
1022 order.display_qty(),
1023 Some(emulation_trigger),
1024 Some(trigger_instrument_id),
1025 order.contingency_type(),
1026 order.order_list_id(),
1027 order.linked_order_ids().map(Vec::from),
1028 order.parent_order_id(),
1029 order.exec_algorithm_id(),
1030 order.exec_algorithm_params().cloned(),
1031 order.exec_spawn_id(),
1032 order.tags().map(Vec::from),
1033 UUID4::new(),
1034 self.clock.borrow().timestamp_ns(),
1035 ) {
1036 transformed
1037 } else {
1038 log::error!("Cannot create limit order");
1039 return;
1040 };
1041 transformed.liquidity_side = order.liquidity_side();
1042
1043 let original_events = order.events();
1050
1051 for event in original_events {
1052 transformed.events.insert(0, event.clone());
1053 }
1054
1055 if let Err(e) = self.cache.borrow_mut().add_order(
1056 OrderAny::Limit(transformed.clone()),
1057 command.position_id,
1058 command.client_id,
1059 true,
1060 ) {
1061 log::error!("Failed to add order: {e}");
1062 }
1063
1064 msgbus::publish_order_event(
1065 format!("events.order.{}", order.strategy_id()).into(),
1066 transformed.last_event(),
1067 );
1068
1069 let event = OrderReleased::new(
1070 order.trader_id(),
1071 order.strategy_id(),
1072 order.instrument_id(),
1073 order.client_order_id(),
1074 released_price,
1075 UUID4::new(),
1076 self.clock.borrow().timestamp_ns(),
1077 self.clock.borrow().timestamp_ns(),
1078 );
1079
1080 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1081 log::error!("Failed to apply order event: {e}");
1082 }
1083
1084 if let Err(e) = self
1085 .cache
1086 .borrow_mut()
1087 .update_order(&OrderAny::Limit(transformed.clone()))
1088 {
1089 log::error!("Failed to update order: {e}");
1090 }
1091
1092 self.manager.send_risk_event(OrderEventAny::Released(event));
1093
1094 log::info!("Releasing order {}", order.client_order_id());
1095
1096 msgbus::publish_order_event(
1098 format!("events.order.{}", transformed.strategy_id()).into(),
1099 &OrderEventAny::Released(event),
1100 );
1101
1102 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1103 self.manager.send_algo_command(command, exec_algorithm_id);
1104 } else {
1105 self.manager
1106 .send_exec_command(TradingCommand::SubmitOrder(command));
1107 }
1108 }
1109 }
1110
1111 pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
1115 let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
1116 Some(order) => order,
1117 None => {
1118 log::error!("Cannot fill market order: order {client_order_id} not found in cache");
1119 return;
1120 }
1121 };
1122
1123 let trigger_instrument_id = order
1124 .trigger_instrument_id()
1125 .unwrap_or(order.instrument_id());
1126
1127 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
1128 Some(core) => core,
1129 None => {
1130 log::error!(
1131 "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
1132 );
1133 return; }
1135 };
1136
1137 let released_price =
1138 match self.validate_release(&order, matching_core, trigger_instrument_id) {
1139 Some(price) => price,
1140 None => return, };
1142
1143 let command = self
1144 .manager
1145 .pop_submit_order_command(order.client_order_id())
1146 .expect("invalid operation `fill_market_order` with no command");
1147
1148 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1149 if let Err(e) = matching_core.delete_order(client_order_id) {
1150 log::error!("Cannot delete order: {e:?}");
1151 }
1152
1153 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1154
1155 let mut transformed = MarketOrder::new(
1157 order.trader_id(),
1158 order.strategy_id(),
1159 order.instrument_id(),
1160 order.client_order_id(),
1161 order.order_side(),
1162 order.quantity(),
1163 order.time_in_force(),
1164 UUID4::new(),
1165 self.clock.borrow().timestamp_ns(),
1166 order.is_reduce_only(),
1167 order.is_quote_quantity(),
1168 order.contingency_type(),
1169 order.order_list_id(),
1170 order.linked_order_ids().map(Vec::from),
1171 order.parent_order_id(),
1172 order.exec_algorithm_id(),
1173 order.exec_algorithm_params().cloned(),
1174 order.exec_spawn_id(),
1175 order.tags().map(Vec::from),
1176 );
1177
1178 let original_events = order.events();
1179
1180 for event in original_events {
1181 transformed.events.insert(0, event.clone());
1182 }
1183
1184 if let Err(e) = self.cache.borrow_mut().add_order(
1185 OrderAny::Market(transformed.clone()),
1186 command.position_id,
1187 command.client_id,
1188 true,
1189 ) {
1190 log::error!("Failed to add order: {e}");
1191 }
1192
1193 msgbus::publish_order_event(
1194 format!("events.order.{}", order.strategy_id()).into(),
1195 transformed.last_event(),
1196 );
1197
1198 let ts_now = self.clock.borrow().timestamp_ns();
1199 let event = OrderReleased::new(
1200 order.trader_id(),
1201 order.strategy_id(),
1202 order.instrument_id(),
1203 order.client_order_id(),
1204 released_price,
1205 UUID4::new(),
1206 ts_now,
1207 ts_now,
1208 );
1209
1210 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1211 log::error!("Failed to apply order event: {e}");
1212 }
1213
1214 if let Err(e) = self
1215 .cache
1216 .borrow_mut()
1217 .update_order(&OrderAny::Market(transformed))
1218 {
1219 log::error!("Failed to update order: {e}");
1220 }
1221 self.manager.send_risk_event(OrderEventAny::Released(event));
1222
1223 log::info!("Releasing order {}", order.client_order_id());
1224
1225 msgbus::publish_order_event(
1227 format!("events.order.{}", order.strategy_id()).into(),
1228 &OrderEventAny::Released(event),
1229 );
1230
1231 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1232 self.manager.send_algo_command(command, exec_algorithm_id);
1233 } else {
1234 self.manager
1235 .send_exec_command(TradingCommand::SubmitOrder(command));
1236 }
1237 }
1238 }
1239
1240 #[allow(clippy::too_many_lines)]
1241 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1242 let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1243 log::error!(
1244 "Cannot update trailing-stop order: no matching core for instrument {}",
1245 order.instrument_id()
1246 );
1247 return;
1248 };
1249
1250 let mut bid = matching_core.bid;
1251 let mut ask = matching_core.ask;
1252 let mut last = matching_core.last;
1253
1254 if bid.is_none() || ask.is_none() || last.is_none() {
1255 if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1256 bid.get_or_insert(q.bid_price);
1257 ask.get_or_insert(q.ask_price);
1258 }
1259 if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1260 last.get_or_insert(t.price);
1261 }
1262 }
1263
1264 let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1265 matching_core.price_increment,
1266 order.trigger_price(),
1267 order.activation_price(),
1268 order,
1269 bid,
1270 ask,
1271 last,
1272 ) {
1273 Ok(pair) => pair,
1274 Err(e) => {
1275 log::warn!("Cannot calculate trailing-stop update: {e}");
1276 return;
1277 }
1278 };
1279
1280 if new_trigger_px.is_none() && new_limit_px.is_none() {
1281 return;
1282 }
1283
1284 let ts_now = self.clock.borrow().timestamp_ns();
1285 let update = OrderUpdated::new(
1286 order.trader_id(),
1287 order.strategy_id(),
1288 order.instrument_id(),
1289 order.client_order_id(),
1290 order.quantity(),
1291 UUID4::new(),
1292 ts_now,
1293 ts_now,
1294 false,
1295 order.venue_order_id(),
1296 order.account_id(),
1297 new_limit_px,
1298 new_trigger_px,
1299 None,
1300 );
1301 let wrapped = OrderEventAny::Updated(update);
1302 if let Err(e) = order.apply(wrapped.clone()) {
1303 log::error!("Failed to apply order event: {e}");
1304 return;
1305 }
1306 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1307 log::error!("Failed to update order in cache: {e}");
1308 return;
1309 }
1310 self.manager.send_risk_event(wrapped);
1311 }
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316 use std::{cell::RefCell, rc::Rc};
1317
1318 use nautilus_common::{cache::Cache, clock::TestClock};
1319 use nautilus_core::{UUID4, WeakCell};
1320 use nautilus_model::{
1321 data::{QuoteTick, TradeTick},
1322 enums::{AggressorSide, OrderSide, OrderType, TriggerType},
1323 identifiers::{StrategyId, TradeId, TraderId},
1324 instruments::{
1325 CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1326 },
1327 orders::OrderTestBuilder,
1328 types::{Price, Quantity},
1329 };
1330 use rstest::{fixture, rstest};
1331
1332 use super::*;
1333
1334 #[fixture]
1335 fn instrument() -> CryptoPerpetual {
1336 crypto_perpetual_ethusdt()
1337 }
1338
1339 #[allow(clippy::type_complexity)]
1340 fn create_emulator() -> (
1341 Rc<RefCell<dyn Clock>>,
1342 Rc<RefCell<Cache>>,
1343 Rc<RefCell<OrderEmulator>>,
1344 ) {
1345 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1346 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
1347 let emulator = Rc::new(RefCell::new(OrderEmulator::new(
1348 clock.clone(),
1349 cache.clone(),
1350 )));
1351
1352 emulator
1354 .borrow_mut()
1355 .register(TraderId::from("TRADER-001"), clock.clone(), cache.clone())
1356 .unwrap();
1357
1358 let self_ref = WeakCell::from(Rc::downgrade(&emulator));
1360 emulator.borrow_mut().set_self_ref(self_ref);
1361
1362 (clock, cache, emulator)
1363 }
1364
1365 fn create_stop_market_order(instrument: &CryptoPerpetual, trigger: TriggerType) -> OrderAny {
1366 OrderTestBuilder::new(OrderType::StopMarket)
1367 .instrument_id(instrument.id())
1368 .side(OrderSide::Buy)
1369 .trigger_price(Price::from("5100.00"))
1370 .quantity(Quantity::from(1))
1371 .emulation_trigger(trigger)
1372 .build()
1373 }
1374
1375 fn create_submit_order(instrument: &CryptoPerpetual, order: &OrderAny) -> SubmitOrder {
1376 SubmitOrder::new(
1377 TraderId::from("TRADER-001"),
1378 None,
1379 StrategyId::from("STRATEGY-001"),
1380 instrument.id(),
1381 order.client_order_id(),
1382 order.init_event().clone(),
1383 None,
1384 None,
1385 None,
1386 UUID4::new(),
1387 0.into(),
1388 )
1389 }
1390
1391 fn create_quote_tick(instrument: &CryptoPerpetual, bid: &str, ask: &str) -> QuoteTick {
1392 QuoteTick::new(
1393 instrument.id(),
1394 Price::from(bid),
1395 Price::from(ask),
1396 Quantity::from(10),
1397 Quantity::from(10),
1398 0.into(),
1399 0.into(),
1400 )
1401 }
1402
1403 fn create_trade_tick(instrument: &CryptoPerpetual, price: &str) -> TradeTick {
1404 TradeTick::new(
1405 instrument.id(),
1406 Price::from(price),
1407 Quantity::from(1),
1408 AggressorSide::Buyer,
1409 TradeId::from("T-001"),
1410 0.into(),
1411 0.into(),
1412 )
1413 }
1414
1415 fn add_instrument_to_cache(cache: &Rc<RefCell<Cache>>, instrument: &CryptoPerpetual) {
1416 cache
1417 .borrow_mut()
1418 .add_instrument(InstrumentAny::CryptoPerpetual(*instrument))
1419 .unwrap();
1420 }
1421
1422 #[rstest]
1423 fn test_subscribed_quotes_initially_empty() {
1424 let (_clock, _cache, emulator) = create_emulator();
1425
1426 assert!(emulator.borrow().subscribed_quotes().is_empty());
1427 }
1428
1429 #[rstest]
1430 fn test_subscribed_trades_initially_empty() {
1431 let (_clock, _cache, emulator) = create_emulator();
1432
1433 assert!(emulator.borrow().subscribed_trades().is_empty());
1434 }
1435
1436 #[rstest]
1437 fn test_get_submit_order_commands_initially_empty() {
1438 let (_clock, _cache, emulator) = create_emulator();
1439
1440 assert!(emulator.borrow().get_submit_order_commands().is_empty());
1441 }
1442
1443 #[rstest]
1444 fn test_get_matching_core_returns_none_when_not_created(instrument: CryptoPerpetual) {
1445 let (_clock, _cache, emulator) = create_emulator();
1446
1447 assert!(
1448 emulator
1449 .borrow()
1450 .get_matching_core(&instrument.id())
1451 .is_none()
1452 );
1453 }
1454
1455 #[rstest]
1456 fn test_create_matching_core(instrument: CryptoPerpetual) {
1457 let (_clock, _cache, emulator) = create_emulator();
1458
1459 emulator
1460 .borrow_mut()
1461 .create_matching_core(instrument.id(), instrument.price_increment);
1462
1463 assert!(
1464 emulator
1465 .borrow()
1466 .get_matching_core(&instrument.id())
1467 .is_some()
1468 );
1469 }
1470
1471 #[rstest]
1472 fn test_on_quote_tick_no_matching_core_does_not_panic(instrument: CryptoPerpetual) {
1473 let (_clock, _cache, emulator) = create_emulator();
1474 let quote = create_quote_tick(&instrument, "5060.00", "5070.00");
1475
1476 emulator.borrow_mut().on_quote_tick(quote);
1477 }
1478
1479 #[rstest]
1480 fn test_on_trade_tick_no_matching_core_does_not_panic(instrument: CryptoPerpetual) {
1481 let (_clock, _cache, emulator) = create_emulator();
1482 let trade = create_trade_tick(&instrument, "5065.00");
1483
1484 emulator.borrow_mut().on_trade_tick(trade);
1485 }
1486
1487 #[rstest]
1488 fn test_submit_order_bid_ask_trigger_creates_matching_core(instrument: CryptoPerpetual) {
1489 let (_clock, cache, emulator) = create_emulator();
1490 add_instrument_to_cache(&cache, &instrument);
1491 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1492 let command = create_submit_order(&instrument, &order);
1493 cache
1494 .borrow_mut()
1495 .add_order(order, None, None, false)
1496 .unwrap();
1497
1498 emulator
1499 .borrow_mut()
1500 .cache_submit_order_command(command.clone());
1501 emulator.borrow_mut().handle_submit_order(command);
1502
1503 assert!(
1504 emulator
1505 .borrow()
1506 .get_matching_core(&instrument.id())
1507 .is_some()
1508 );
1509 }
1510
1511 #[rstest]
1512 fn test_submit_order_bid_ask_trigger_tracks_quote_subscription(instrument: CryptoPerpetual) {
1513 let (_clock, cache, emulator) = create_emulator();
1514 add_instrument_to_cache(&cache, &instrument);
1515 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1516 let command = create_submit_order(&instrument, &order);
1517 cache
1518 .borrow_mut()
1519 .add_order(order, None, None, false)
1520 .unwrap();
1521
1522 emulator
1523 .borrow_mut()
1524 .cache_submit_order_command(command.clone());
1525 emulator.borrow_mut().handle_submit_order(command);
1526
1527 assert_eq!(emulator.borrow().subscribed_quotes(), vec![instrument.id()]);
1528 assert!(emulator.borrow().subscribed_trades().is_empty());
1529 }
1530
1531 #[rstest]
1532 fn test_submit_order_last_price_trigger_tracks_trade_subscription(instrument: CryptoPerpetual) {
1533 let (_clock, cache, emulator) = create_emulator();
1534 add_instrument_to_cache(&cache, &instrument);
1535 let order = create_stop_market_order(&instrument, TriggerType::LastPrice);
1536 let command = create_submit_order(&instrument, &order);
1537 cache
1538 .borrow_mut()
1539 .add_order(order, None, None, false)
1540 .unwrap();
1541
1542 emulator
1543 .borrow_mut()
1544 .cache_submit_order_command(command.clone());
1545 emulator.borrow_mut().handle_submit_order(command);
1546
1547 assert!(emulator.borrow().subscribed_quotes().is_empty());
1548 assert_eq!(emulator.borrow().subscribed_trades(), vec![instrument.id()]);
1549 }
1550
1551 #[rstest]
1552 fn test_submit_order_caches_command(instrument: CryptoPerpetual) {
1553 let (_clock, cache, emulator) = create_emulator();
1554 add_instrument_to_cache(&cache, &instrument);
1555 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1556 let client_order_id = order.client_order_id();
1557 let command = create_submit_order(&instrument, &order);
1558 cache
1559 .borrow_mut()
1560 .add_order(order, None, None, false)
1561 .unwrap();
1562
1563 emulator
1564 .borrow_mut()
1565 .cache_submit_order_command(command.clone());
1566 emulator.borrow_mut().handle_submit_order(command);
1567
1568 let commands = emulator.borrow().get_submit_order_commands();
1569 assert!(commands.contains_key(&client_order_id));
1570 }
1571
1572 #[rstest]
1573 fn test_quote_tick_updates_matching_core_prices(instrument: CryptoPerpetual) {
1574 let (_clock, cache, emulator) = create_emulator();
1575 add_instrument_to_cache(&cache, &instrument);
1576 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1577 let command = create_submit_order(&instrument, &order);
1578 cache
1579 .borrow_mut()
1580 .add_order(order, None, None, false)
1581 .unwrap();
1582 emulator
1583 .borrow_mut()
1584 .cache_submit_order_command(command.clone());
1585 emulator.borrow_mut().handle_submit_order(command);
1586
1587 let quote = create_quote_tick(&instrument, "5060.00", "5070.00");
1588 emulator.borrow_mut().on_quote_tick(quote);
1589
1590 let core = emulator
1591 .borrow()
1592 .get_matching_core(&instrument.id())
1593 .unwrap();
1594 assert_eq!(core.bid, Some(Price::from("5060.00")));
1595 assert_eq!(core.ask, Some(Price::from("5070.00")));
1596 }
1597
1598 #[rstest]
1599 fn test_trade_tick_updates_matching_core_last_price(instrument: CryptoPerpetual) {
1600 let (_clock, cache, emulator) = create_emulator();
1601 add_instrument_to_cache(&cache, &instrument);
1602 let order = create_stop_market_order(&instrument, TriggerType::LastPrice);
1603 let command = create_submit_order(&instrument, &order);
1604 cache
1605 .borrow_mut()
1606 .add_order(order, None, None, false)
1607 .unwrap();
1608 emulator
1609 .borrow_mut()
1610 .cache_submit_order_command(command.clone());
1611 emulator.borrow_mut().handle_submit_order(command);
1612
1613 let trade = create_trade_tick(&instrument, "5065.00");
1614 emulator.borrow_mut().on_trade_tick(trade);
1615
1616 let core = emulator
1617 .borrow()
1618 .get_matching_core(&instrument.id())
1619 .unwrap();
1620 assert_eq!(core.last, Some(Price::from("5065.00")));
1621 }
1622
1623 #[rstest]
1624 fn test_cancel_order_removes_from_matching_core(instrument: CryptoPerpetual) {
1625 let (_clock, cache, emulator) = create_emulator();
1626 add_instrument_to_cache(&cache, &instrument);
1627 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1628 let command = create_submit_order(&instrument, &order);
1629 cache
1630 .borrow_mut()
1631 .add_order(order.clone(), None, None, false)
1632 .unwrap();
1633 emulator
1634 .borrow_mut()
1635 .cache_submit_order_command(command.clone());
1636 emulator.borrow_mut().handle_submit_order(command);
1637
1638 emulator.borrow_mut().cancel_order(&order);
1639
1640 let core = emulator
1641 .borrow()
1642 .get_matching_core(&instrument.id())
1643 .unwrap();
1644 assert!(core.get_orders().is_empty());
1645 }
1646
1647 #[rstest]
1648 fn test_cancel_order_removes_cached_command(instrument: CryptoPerpetual) {
1649 let (_clock, cache, emulator) = create_emulator();
1650 add_instrument_to_cache(&cache, &instrument);
1651 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1652 let client_order_id = order.client_order_id();
1653 let command = create_submit_order(&instrument, &order);
1654 cache
1655 .borrow_mut()
1656 .add_order(order.clone(), None, None, false)
1657 .unwrap();
1658 emulator
1659 .borrow_mut()
1660 .cache_submit_order_command(command.clone());
1661 emulator.borrow_mut().handle_submit_order(command);
1662
1663 emulator.borrow_mut().cancel_order(&order);
1664
1665 let commands = emulator.borrow().get_submit_order_commands();
1666 assert!(!commands.contains_key(&client_order_id));
1667 }
1668}