Skip to main content

nautilus_execution/order_emulator/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    fmt::Debug,
19    ops::{Deref, DerefMut},
20    rc::Rc,
21};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25    actor::{DataActorConfig, DataActorCore},
26    cache::Cache,
27    clock::Clock,
28    logging::{CMD, EVT, RECV},
29    messages::execution::{
30        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
31    },
32    msgbus::{
33        self, TypedHandler,
34        switchboard::{get_quotes_topic, get_trades_topic},
35    },
36};
37use nautilus_core::{UUID4, WeakCell};
38use nautilus_model::{
39    data::{OrderBookDeltas, QuoteTick, TradeTick},
40    enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
41    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
42    identifiers::{ActorId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId},
43    instruments::Instrument,
44    orders::{LimitOrder, MarketOrder, Order, OrderAny},
45    types::{Price, Quantity},
46};
47
48use crate::{
49    matching_core::{OrderMatchInfo, OrderMatchingCore},
50    order_manager::{
51        handlers::{CancelOrderHandlerAny, ModifyOrderHandlerAny, SubmitOrderHandlerAny},
52        manager::OrderManager,
53    },
54    trailing::trailing_stop_calculate,
55};
56
57pub struct OrderEmulator {
58    actor: DataActorCore,
59    clock: Rc<RefCell<dyn Clock>>,
60    cache: Rc<RefCell<Cache>>,
61    manager: OrderManager,
62    matching_cores: AHashMap<InstrumentId, OrderMatchingCore>,
63    subscribed_quotes: AHashSet<InstrumentId>,
64    subscribed_trades: AHashSet<InstrumentId>,
65    subscribed_strategies: AHashSet<StrategyId>,
66    monitored_positions: AHashSet<PositionId>,
67    on_event_handler: Option<TypedHandler<OrderEventAny>>,
68    self_ref: Option<WeakCell<Self>>,
69}
70
71impl Debug for OrderEmulator {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct(stringify!(OrderEmulator))
74            .field("actor", &self.actor)
75            .field("cores", &self.matching_cores.len())
76            .field("subscribed_quotes", &self.subscribed_quotes.len())
77            .finish()
78    }
79}
80
81impl Deref for OrderEmulator {
82    type Target = DataActorCore;
83
84    fn deref(&self) -> &Self::Target {
85        &self.actor
86    }
87}
88
89impl DerefMut for OrderEmulator {
90    fn deref_mut(&mut self) -> &mut Self::Target {
91        &mut self.actor
92    }
93}
94
95impl OrderEmulator {
96    pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
97        let config = DataActorConfig {
98            actor_id: Some(ActorId::from("OrderEmulator")),
99            ..Default::default()
100        };
101
102        let active_local = true;
103        let manager =
104            OrderManager::new(clock.clone(), cache.clone(), active_local, None, None, None);
105
106        Self {
107            actor: DataActorCore::new(config),
108            clock,
109            cache,
110            manager,
111            matching_cores: AHashMap::new(),
112            subscribed_quotes: AHashSet::new(),
113            subscribed_trades: AHashSet::new(),
114            subscribed_strategies: AHashSet::new(),
115            monitored_positions: AHashSet::new(),
116            on_event_handler: None,
117            self_ref: None,
118        }
119    }
120
121    /// Sets the weak self-reference for creating subscription handlers.
122    pub fn set_self_ref(&mut self, self_ref: WeakCell<Self>) {
123        self.self_ref = Some(self_ref);
124    }
125
126    /// Registers the emulator with the trading system.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if registration fails.
131    pub fn register(
132        &mut self,
133        trader_id: TraderId,
134        clock: Rc<RefCell<dyn Clock>>,
135        cache: Rc<RefCell<Cache>>,
136    ) -> anyhow::Result<()> {
137        self.actor.register(trader_id, clock, cache)
138    }
139
140    pub fn set_on_event_handler(&mut self, handler: TypedHandler<OrderEventAny>) {
141        self.on_event_handler = Some(handler);
142    }
143
144    /// Sets the handler for submit order commands.
145    pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
146        self.manager.set_submit_order_handler(handler);
147    }
148
149    /// Sets the handler for cancel order commands.
150    pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
151        self.manager.set_cancel_order_handler(handler);
152    }
153
154    /// Sets the handler for modify order commands.
155    pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
156        self.manager.set_modify_order_handler(handler);
157    }
158
159    /// Caches a submit order command for emulation tracking.
160    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
161        self.manager.cache_submit_order_command(command);
162    }
163
164    /// Subscribes to quote data for the given instrument.
165    fn subscribe_quotes_for_instrument(&mut self, instrument_id: InstrumentId) {
166        let Some(self_ref) = self.self_ref.clone() else {
167            log::warn!("Cannot subscribe to quotes: self_ref not set");
168            return;
169        };
170
171        let topic = get_quotes_topic(instrument_id);
172        let handler = TypedHandler::from(move |quote: &QuoteTick| {
173            if let Some(emulator) = self_ref.upgrade() {
174                emulator.borrow_mut().on_quote_tick(*quote);
175            }
176        });
177
178        self.actor
179            .subscribe_quotes(topic, handler, instrument_id, None, None);
180    }
181
182    /// Subscribes to trade data for the given instrument.
183    fn subscribe_trades_for_instrument(&mut self, instrument_id: InstrumentId) {
184        let Some(self_ref) = self.self_ref.clone() else {
185            log::warn!("Cannot subscribe to trades: self_ref not set");
186            return;
187        };
188
189        let topic = get_trades_topic(instrument_id);
190        let handler = TypedHandler::from(move |trade: &TradeTick| {
191            if let Some(emulator) = self_ref.upgrade() {
192                emulator.borrow_mut().on_trade_tick(*trade);
193            }
194        });
195
196        self.actor
197            .subscribe_trades(topic, handler, instrument_id, None, None);
198    }
199
200    #[must_use]
201    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
202        let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
203        quotes.sort();
204        quotes
205    }
206
207    #[must_use]
208    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
209        let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
210        trades.sort();
211        trades
212    }
213
214    #[must_use]
215    pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
216        self.manager.get_submit_order_commands()
217    }
218
219    #[must_use]
220    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
221        self.matching_cores.get(instrument_id).cloned()
222    }
223
224    /// Reactivates emulated orders from cache on start.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if no emulated orders are found or processing fails.
229    ///
230    /// # Panics
231    ///
232    /// Panics if a cached client ID cannot be unwrapped.
233    pub fn on_start(&mut self) -> anyhow::Result<()> {
234        let emulated_orders: Vec<OrderAny> = self
235            .cache
236            .borrow()
237            .orders_emulated(None, None, None, None, None)
238            .into_iter()
239            .cloned()
240            .collect();
241
242        if emulated_orders.is_empty() {
243            log::error!("No emulated orders to reactivate");
244            return Ok(());
245        }
246
247        for order in emulated_orders {
248            if !matches!(
249                order.status(),
250                OrderStatus::Initialized | OrderStatus::Emulated
251            ) {
252                continue; // No longer emulated
253            }
254
255            if let Some(parent_order_id) = &order.parent_order_id() {
256                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
257                    order.clone()
258                } else {
259                    log::error!("Cannot handle order: parent {parent_order_id} not found");
260                    continue;
261                };
262
263                let is_position_closed = parent_order
264                    .position_id()
265                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
266                if parent_order.is_closed() && is_position_closed {
267                    self.manager.cancel_order(&order);
268                    continue; // Parent already closed
269                }
270
271                if parent_order.contingency_type() == Some(ContingencyType::Oto)
272                    && (parent_order.is_active_local()
273                        || parent_order.filled_qty() == Quantity::zero(0))
274                {
275                    continue; // Process contingency order later once parent triggered
276                }
277            }
278
279            let position_id = self
280                .cache
281                .borrow()
282                .position_id(&order.client_order_id())
283                .copied();
284            let client_id = self
285                .cache
286                .borrow()
287                .client_id(&order.client_order_id())
288                .copied();
289
290            let command = SubmitOrder::new(
291                order.trader_id(),
292                client_id,
293                order.strategy_id(),
294                order.instrument_id(),
295                order.client_order_id(),
296                order.init_event().clone(),
297                order.exec_algorithm_id(),
298                position_id,
299                None, // params
300                UUID4::new(),
301                self.clock.borrow().timestamp_ns(),
302            );
303
304            self.handle_submit_order(command);
305        }
306
307        Ok(())
308    }
309
310    /// # Panics
311    ///
312    /// Panics if the order cannot be converted to a passive order.
313    pub fn on_event(&mut self, event: OrderEventAny) {
314        log::info!("{RECV}{EVT} {event}");
315
316        self.manager.handle_event(event.clone());
317
318        if let Some(order) = self.cache.borrow().order(&event.client_order_id())
319            && order.is_closed()
320            && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
321            && let Err(e) = matching_core.delete_order(event.client_order_id())
322        {
323            log::error!("Error deleting order: {e}");
324        }
325        // else: Order not in cache yet
326    }
327
328    pub const fn on_stop(&self) {}
329
330    pub fn on_reset(&mut self) {
331        self.manager.reset();
332        self.matching_cores.clear();
333    }
334
335    pub const fn on_dispose(&self) {}
336
337    pub fn execute(&mut self, command: TradingCommand) {
338        log::info!("{RECV}{CMD} {command}");
339
340        match command {
341            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
342            TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
343            TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
344            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
345            TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
346            _ => log::error!("Cannot handle command: unrecognized {command:?}"),
347        }
348    }
349
350    fn create_matching_core(
351        &mut self,
352        instrument_id: InstrumentId,
353        price_increment: Price,
354    ) -> OrderMatchingCore {
355        let matching_core =
356            OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
357        self.matching_cores
358            .insert(instrument_id, matching_core.clone());
359        log::info!("Creating matching core for {instrument_id:?}");
360        matching_core
361    }
362
363    /// # Panics
364    ///
365    /// Panics if the emulation trigger type is `NoTrigger` or if order not in cache.
366    pub fn handle_submit_order(&mut self, command: SubmitOrder) {
367        let client_order_id = command.client_order_id;
368
369        let mut order = self
370            .cache
371            .borrow()
372            .order(&client_order_id)
373            .cloned()
374            .expect("order must exist in cache");
375
376        let emulation_trigger = order.emulation_trigger();
377
378        assert_ne!(
379            emulation_trigger,
380            Some(TriggerType::NoTrigger),
381            "order.emulation_trigger must not be TriggerType::NoTrigger"
382        );
383        assert!(
384            self.manager
385                .get_submit_order_commands()
386                .contains_key(&client_order_id),
387            "client_order_id must be in submit_order_commands"
388        );
389
390        if !matches!(
391            emulation_trigger,
392            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
393        ) {
394            log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
395            self.manager.cancel_order(&order);
396            return;
397        }
398
399        self.check_monitoring(command.strategy_id, command.position_id);
400
401        // Get or create matching core
402        let trigger_instrument_id = order
403            .trigger_instrument_id()
404            .unwrap_or_else(|| order.instrument_id());
405
406        let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
407
408        let mut matching_core = if let Some(core) = matching_core {
409            core
410        } else {
411            // Handle synthetic instruments
412            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
413                let synthetic = self
414                    .cache
415                    .borrow()
416                    .synthetic(&trigger_instrument_id)
417                    .cloned();
418                if let Some(synthetic) = synthetic {
419                    (synthetic.id, synthetic.price_increment)
420                } else {
421                    log::error!(
422                        "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
423                    );
424                    self.manager.cancel_order(&order);
425                    return;
426                }
427            } else {
428                let instrument = self
429                    .cache
430                    .borrow()
431                    .instrument(&trigger_instrument_id)
432                    .cloned();
433                if let Some(instrument) = instrument {
434                    (instrument.id(), instrument.price_increment())
435                } else {
436                    log::error!(
437                        "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
438                    );
439                    self.manager.cancel_order(&order);
440                    return;
441                }
442            };
443
444            self.create_matching_core(instrument_id, price_increment)
445        };
446
447        // Update trailing stop
448        if matches!(
449            order.order_type(),
450            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
451        ) {
452            self.update_trailing_stop_order(&mut order);
453            if order.trigger_price().is_none() {
454                log::error!(
455                    "Cannot handle trailing stop order with no trigger_price and no market updates"
456                );
457
458                self.manager.cancel_order(&order);
459                return;
460            }
461        }
462
463        // Cache command
464        self.manager.cache_submit_order_command(command);
465
466        // Check if immediately marketable
467        let match_info = OrderMatchInfo::new(
468            order.client_order_id(),
469            order.order_side().as_specified(),
470            order.order_type(),
471            order.trigger_price(),
472            order.price(),
473            true, // is_activated
474        );
475        matching_core.match_order(&match_info);
476
477        // Handle data subscriptions
478        match emulation_trigger.unwrap() {
479            TriggerType::Default | TriggerType::BidAsk => {
480                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
481                    self.subscribe_quotes_for_instrument(trigger_instrument_id);
482                    self.subscribed_quotes.insert(trigger_instrument_id);
483                }
484            }
485            TriggerType::LastPrice => {
486                if !self.subscribed_trades.contains(&trigger_instrument_id) {
487                    self.subscribe_trades_for_instrument(trigger_instrument_id);
488                    self.subscribed_trades.insert(trigger_instrument_id);
489                }
490            }
491            _ => {
492                log::error!("Invalid TriggerType: {emulation_trigger:?}");
493                return;
494            }
495        }
496
497        // Check if order was already released
498        if !self
499            .manager
500            .get_submit_order_commands()
501            .contains_key(&order.client_order_id())
502        {
503            return; // Already released
504        }
505
506        // Hold in matching core
507        matching_core.add_order(match_info);
508
509        // Generate emulated event if needed
510        if order.status() == OrderStatus::Initialized {
511            let event = OrderEmulated::new(
512                order.trader_id(),
513                order.strategy_id(),
514                order.instrument_id(),
515                order.client_order_id(),
516                UUID4::new(),
517                self.clock.borrow().timestamp_ns(),
518                self.clock.borrow().timestamp_ns(),
519            );
520
521            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
522                log::error!("Cannot apply order event: {e:?}");
523                return;
524            }
525
526            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
527                log::error!("Cannot update order: {e:?}");
528                return;
529            }
530
531            self.manager.send_risk_event(OrderEventAny::Emulated(event));
532
533            msgbus::publish_order_event(
534                format!("events.order.{}", order.strategy_id()).into(),
535                &OrderEventAny::Emulated(event),
536            );
537        }
538
539        // Since we are cloning the matching core, we need to insert it back into the original hashmap
540        self.matching_cores
541            .insert(trigger_instrument_id, matching_core);
542
543        log::info!("Emulating {order}");
544    }
545
546    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
547        self.check_monitoring(command.strategy_id, command.position_id);
548
549        let orders: Vec<OrderAny> = self
550            .cache
551            .borrow()
552            .orders_for_ids(&command.order_list.client_order_ids, &command);
553
554        for order in &orders {
555            if let Some(parent_order_id) = order.parent_order_id() {
556                let cache = self.cache.borrow();
557                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
558                    parent_order
559                } else {
560                    log::error!("Parent order for {} not found", order.client_order_id());
561                    continue;
562                };
563
564                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
565                    continue; // Process contingency order later once parent triggered
566                }
567            }
568
569            if let Err(e) =
570                self.manager
571                    .create_new_submit_order(order, command.position_id, command.client_id)
572            {
573                log::error!("Error creating new submit order: {e}");
574            }
575        }
576    }
577
578    fn handle_modify_order(&mut self, command: ModifyOrder) {
579        if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
580            let price = match command.price {
581                Some(price) => Some(price),
582                None => order.price(),
583            };
584
585            let trigger_price = match command.trigger_price {
586                Some(trigger_price) => Some(trigger_price),
587                None => order.trigger_price(),
588            };
589
590            let ts_now = self.clock.borrow().timestamp_ns();
591            let event = OrderUpdated::new(
592                order.trader_id(),
593                order.strategy_id(),
594                order.instrument_id(),
595                order.client_order_id(),
596                command.quantity.unwrap_or(order.quantity()),
597                UUID4::new(),
598                ts_now,
599                ts_now,
600                false,
601                order.venue_order_id(),
602                order.account_id(),
603                price,
604                trigger_price,
605                None,
606            );
607
608            self.manager.send_exec_event(OrderEventAny::Updated(event));
609
610            let trigger_instrument_id = order
611                .trigger_instrument_id()
612                .unwrap_or_else(|| order.instrument_id());
613
614            if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
615                let match_info = OrderMatchInfo::new(
616                    order.client_order_id(),
617                    order.order_side().as_specified(),
618                    order.order_type(),
619                    trigger_price,
620                    price,
621                    true, // is_activated
622                );
623                matching_core.match_order(&match_info);
624            } else {
625                log::error!(
626                    "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
627                );
628            }
629        } else {
630            log::error!("Cannot modify order: {} not found", command.client_order_id);
631        }
632    }
633
634    pub fn handle_cancel_order(&mut self, command: CancelOrder) {
635        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
636            order.clone()
637        } else {
638            log::error!("Cannot cancel order: {} not found", command.client_order_id);
639            return;
640        };
641
642        let trigger_instrument_id = order
643            .trigger_instrument_id()
644            .unwrap_or_else(|| order.instrument_id());
645
646        let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
647            core
648        } else {
649            self.manager.cancel_order(&order);
650            return;
651        };
652
653        if !matching_core.order_exists(order.client_order_id())
654            && order.is_open()
655            && !order.is_pending_cancel()
656        {
657            // Order not held in the emulator
658            self.manager
659                .send_exec_command(TradingCommand::CancelOrder(command));
660        } else {
661            self.manager.cancel_order(&order);
662        }
663    }
664
665    fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
666        let instrument_id = command.instrument_id;
667        let matching_core = match self.matching_cores.get(&instrument_id) {
668            Some(core) => core,
669            None => return, // No orders to cancel
670        };
671
672        let orders_to_cancel = match command.order_side {
673            OrderSide::NoOrderSide => {
674                // Get both bid and ask orders
675                let mut all_orders = Vec::new();
676                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
677                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
678                all_orders
679            }
680            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
681            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
682        };
683
684        // Process all orders in a single iteration
685        for match_info in orders_to_cancel {
686            if let Some(order) = self
687                .cache
688                .borrow()
689                .order(&match_info.client_order_id)
690                .cloned()
691            {
692                self.manager.cancel_order(&order);
693            }
694        }
695    }
696
697    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
698        log::info!(
699            "Updating order {} quantity to {new_quantity}",
700            order.client_order_id(),
701        );
702
703        let ts_now = self.clock.borrow().timestamp_ns();
704        let event = OrderUpdated::new(
705            order.trader_id(),
706            order.strategy_id(),
707            order.instrument_id(),
708            order.client_order_id(),
709            new_quantity,
710            UUID4::new(),
711            ts_now,
712            ts_now,
713            false,
714            None,
715            order.account_id(),
716            None,
717            None,
718            None,
719        );
720
721        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
722            log::error!("Cannot apply order event: {e:?}");
723            return;
724        }
725        if let Err(e) = self.cache.borrow_mut().update_order(order) {
726            log::error!("Cannot update order: {e:?}");
727            return;
728        }
729
730        self.manager.send_risk_event(OrderEventAny::Updated(event));
731    }
732
733    pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
734        log::debug!("Processing {deltas:?}");
735
736        let instrument_id = &deltas.instrument_id;
737        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
738            if let Some(book) = self.cache.borrow().order_book(instrument_id) {
739                let best_bid = book.best_bid_price();
740                let best_ask = book.best_ask_price();
741
742                if let Some(best_bid) = best_bid {
743                    matching_core.set_bid_raw(best_bid);
744                }
745
746                if let Some(best_ask) = best_ask {
747                    matching_core.set_ask_raw(best_ask);
748                }
749            } else {
750                log::error!(
751                    "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
752                    deltas.instrument_id
753                );
754            }
755
756            self.iterate_orders(instrument_id);
757        } else {
758            log::error!(
759                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
760                deltas.instrument_id
761            );
762        }
763    }
764
765    pub fn on_quote_tick(&mut self, quote: QuoteTick) {
766        log::debug!("Processing {quote}:?");
767
768        let instrument_id = &quote.instrument_id;
769        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
770            matching_core.set_bid_raw(quote.bid_price);
771            matching_core.set_ask_raw(quote.ask_price);
772
773            self.iterate_orders(instrument_id);
774        } else {
775            log::error!(
776                "Cannot handle `QuoteTick`: no matching core for instrument {}",
777                quote.instrument_id
778            );
779        }
780    }
781
782    pub fn on_trade_tick(&mut self, trade: TradeTick) {
783        log::debug!("Processing {trade:?}");
784
785        let instrument_id = &trade.instrument_id;
786        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
787            matching_core.set_last_raw(trade.price);
788            if !self.subscribed_quotes.contains(instrument_id) {
789                matching_core.set_bid_raw(trade.price);
790                matching_core.set_ask_raw(trade.price);
791            }
792
793            self.iterate_orders(instrument_id);
794        } else {
795            log::error!(
796                "Cannot handle `TradeTick`: no matching core for instrument {}",
797                trade.instrument_id
798            );
799        }
800    }
801
802    fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
803        let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
804            matching_core.iterate();
805
806            matching_core.get_orders()
807        } else {
808            log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
809            return;
810        };
811
812        for match_info in orders {
813            if !matches!(
814                match_info.order_type,
815                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
816            ) {
817                continue;
818            }
819
820            let mut order = match self
821                .cache
822                .borrow()
823                .order(&match_info.client_order_id)
824                .cloned()
825            {
826                Some(order) => order,
827                None => continue,
828            };
829
830            if order.is_closed() {
831                continue;
832            }
833
834            self.update_trailing_stop_order(&mut order);
835        }
836    }
837
838    /// # Panics
839    ///
840    /// Panics if the order cannot be converted to a passive order.
841    pub fn cancel_order(&mut self, order: &OrderAny) {
842        log::info!("Canceling order {}", order.client_order_id());
843
844        let mut order = order.clone();
845        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
846
847        let trigger_instrument_id = order
848            .trigger_instrument_id()
849            .unwrap_or(order.instrument_id());
850
851        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
852            && let Err(e) = matching_core.delete_order(order.client_order_id())
853        {
854            log::error!("Cannot delete order: {e:?}");
855        }
856
857        self.manager
858            .pop_submit_order_command(order.client_order_id());
859
860        self.cache
861            .borrow_mut()
862            .update_order_pending_cancel_local(&order);
863
864        let ts_now = self.clock.borrow().timestamp_ns();
865        let event = OrderCanceled::new(
866            order.trader_id(),
867            order.strategy_id(),
868            order.instrument_id(),
869            order.client_order_id(),
870            UUID4::new(),
871            ts_now,
872            ts_now,
873            false,
874            order.venue_order_id(),
875            order.account_id(),
876        );
877
878        self.manager.send_exec_event(OrderEventAny::Canceled(event));
879    }
880
881    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
882        if !self.subscribed_strategies.contains(&strategy_id) {
883            // Subscribe to strategy order events
884            if let Some(handler) = &self.on_event_handler {
885                msgbus::subscribe_order_events(
886                    format!("events.order.{strategy_id}").into(),
887                    handler.clone(),
888                    None,
889                );
890                self.subscribed_strategies.insert(strategy_id);
891                log::info!("Subscribed to strategy {strategy_id} order events");
892            }
893        }
894
895        if let Some(position_id) = position_id
896            && !self.monitored_positions.contains(&position_id)
897        {
898            self.monitored_positions.insert(position_id);
899        }
900    }
901
902    /// Validates market data availability for order release.
903    ///
904    /// Returns `Some(released_price)` if market data is available, `None` otherwise.
905    /// Logs appropriate warnings when market data is not yet available.
906    ///
907    /// Does NOT pop the submit order command - caller must do that and handle missing command
908    /// according to their contract (panic for market orders, return for limit orders).
909    fn validate_release(
910        &self,
911        order: &OrderAny,
912        matching_core: &OrderMatchingCore,
913        trigger_instrument_id: InstrumentId,
914    ) -> Option<Price> {
915        let released_price = match order.order_side_specified() {
916            OrderSideSpecified::Buy => matching_core.ask,
917            OrderSideSpecified::Sell => matching_core.bid,
918        };
919
920        if released_price.is_none() {
921            log::warn!(
922                "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
923                order.client_order_id(),
924            );
925            return None;
926        }
927
928        Some(released_price.unwrap())
929    }
930
931    /// # Panics
932    ///
933    /// Panics if the order type is invalid for a stop order.
934    pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
935        let order = match self.cache.borrow().order(&client_order_id).cloned() {
936            Some(order) => order,
937            None => {
938                log::error!(
939                    "Cannot trigger stop order: order {client_order_id} not found in cache"
940                );
941                return;
942            }
943        };
944
945        match order.order_type() {
946            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
947                self.fill_limit_order(client_order_id);
948            }
949            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
950                self.fill_market_order(client_order_id);
951            }
952            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
953        }
954    }
955
956    /// # Panics
957    ///
958    /// Panics if a limit order has no price.
959    pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
960        let order = match self.cache.borrow().order(&client_order_id).cloned() {
961            Some(order) => order,
962            None => {
963                log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
964                return;
965            }
966        };
967
968        if matches!(order.order_type(), OrderType::Limit) {
969            self.fill_market_order(client_order_id);
970            return;
971        }
972
973        let trigger_instrument_id = order
974            .trigger_instrument_id()
975            .unwrap_or(order.instrument_id());
976
977        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
978            Some(core) => core,
979            None => {
980                log::error!(
981                    "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
982                );
983                return; // Order stays queued for retry
984            }
985        };
986
987        let released_price =
988            match self.validate_release(&order, matching_core, trigger_instrument_id) {
989                Some(price) => price,
990                None => return, // Order stays queued for retry
991            };
992
993        let command = match self
994            .manager
995            .pop_submit_order_command(order.client_order_id())
996        {
997            Some(command) => command,
998            None => return, // Order already released
999        };
1000
1001        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1002            if let Err(e) = matching_core.delete_order(client_order_id) {
1003                log::error!("Error deleting order: {e:?}");
1004            }
1005
1006            let emulation_trigger = TriggerType::NoTrigger;
1007
1008            // Transform order
1009            let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
1010                order.trader_id(),
1011                order.strategy_id(),
1012                order.instrument_id(),
1013                order.client_order_id(),
1014                order.order_side(),
1015                order.quantity(),
1016                order.price().unwrap(),
1017                order.time_in_force(),
1018                order.expire_time(),
1019                order.is_post_only(),
1020                order.is_reduce_only(),
1021                order.is_quote_quantity(),
1022                order.display_qty(),
1023                Some(emulation_trigger),
1024                Some(trigger_instrument_id),
1025                order.contingency_type(),
1026                order.order_list_id(),
1027                order.linked_order_ids().map(Vec::from),
1028                order.parent_order_id(),
1029                order.exec_algorithm_id(),
1030                order.exec_algorithm_params().cloned(),
1031                order.exec_spawn_id(),
1032                order.tags().map(Vec::from),
1033                UUID4::new(),
1034                self.clock.borrow().timestamp_ns(),
1035            ) {
1036                transformed
1037            } else {
1038                log::error!("Cannot create limit order");
1039                return;
1040            };
1041            transformed.liquidity_side = order.liquidity_side();
1042
1043            // TODO: fix
1044            // let triggered_price = order.trigger_price();
1045            // if triggered_price.is_some() {
1046            //     transformed.trigger_price() = (triggered_price.unwrap());
1047            // }
1048
1049            let original_events = order.events();
1050
1051            for event in original_events {
1052                transformed.events.insert(0, event.clone());
1053            }
1054
1055            if let Err(e) = self.cache.borrow_mut().add_order(
1056                OrderAny::Limit(transformed.clone()),
1057                command.position_id,
1058                command.client_id,
1059                true,
1060            ) {
1061                log::error!("Failed to add order: {e}");
1062            }
1063
1064            msgbus::publish_order_event(
1065                format!("events.order.{}", order.strategy_id()).into(),
1066                transformed.last_event(),
1067            );
1068
1069            let event = OrderReleased::new(
1070                order.trader_id(),
1071                order.strategy_id(),
1072                order.instrument_id(),
1073                order.client_order_id(),
1074                released_price,
1075                UUID4::new(),
1076                self.clock.borrow().timestamp_ns(),
1077                self.clock.borrow().timestamp_ns(),
1078            );
1079
1080            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1081                log::error!("Failed to apply order event: {e}");
1082            }
1083
1084            if let Err(e) = self
1085                .cache
1086                .borrow_mut()
1087                .update_order(&OrderAny::Limit(transformed.clone()))
1088            {
1089                log::error!("Failed to update order: {e}");
1090            }
1091
1092            self.manager.send_risk_event(OrderEventAny::Released(event));
1093
1094            log::info!("Releasing order {}", order.client_order_id());
1095
1096            // Publish event
1097            msgbus::publish_order_event(
1098                format!("events.order.{}", transformed.strategy_id()).into(),
1099                &OrderEventAny::Released(event),
1100            );
1101
1102            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1103                self.manager.send_algo_command(command, exec_algorithm_id);
1104            } else {
1105                self.manager
1106                    .send_exec_command(TradingCommand::SubmitOrder(command));
1107            }
1108        }
1109    }
1110
1111    /// # Panics
1112    ///
1113    /// Panics if a market order command is missing.
1114    pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
1115        let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
1116            Some(order) => order,
1117            None => {
1118                log::error!("Cannot fill market order: order {client_order_id} not found in cache");
1119                return;
1120            }
1121        };
1122
1123        let trigger_instrument_id = order
1124            .trigger_instrument_id()
1125            .unwrap_or(order.instrument_id());
1126
1127        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
1128            Some(core) => core,
1129            None => {
1130                log::error!(
1131                    "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
1132                );
1133                return; // Order stays queued for retry
1134            }
1135        };
1136
1137        let released_price =
1138            match self.validate_release(&order, matching_core, trigger_instrument_id) {
1139                Some(price) => price,
1140                None => return, // Order stays queued for retry
1141            };
1142
1143        let command = self
1144            .manager
1145            .pop_submit_order_command(order.client_order_id())
1146            .expect("invalid operation `fill_market_order` with no command");
1147
1148        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1149            if let Err(e) = matching_core.delete_order(client_order_id) {
1150                log::error!("Cannot delete order: {e:?}");
1151            }
1152
1153            order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1154
1155            // Transform order
1156            let mut transformed = MarketOrder::new(
1157                order.trader_id(),
1158                order.strategy_id(),
1159                order.instrument_id(),
1160                order.client_order_id(),
1161                order.order_side(),
1162                order.quantity(),
1163                order.time_in_force(),
1164                UUID4::new(),
1165                self.clock.borrow().timestamp_ns(),
1166                order.is_reduce_only(),
1167                order.is_quote_quantity(),
1168                order.contingency_type(),
1169                order.order_list_id(),
1170                order.linked_order_ids().map(Vec::from),
1171                order.parent_order_id(),
1172                order.exec_algorithm_id(),
1173                order.exec_algorithm_params().cloned(),
1174                order.exec_spawn_id(),
1175                order.tags().map(Vec::from),
1176            );
1177
1178            let original_events = order.events();
1179
1180            for event in original_events {
1181                transformed.events.insert(0, event.clone());
1182            }
1183
1184            if let Err(e) = self.cache.borrow_mut().add_order(
1185                OrderAny::Market(transformed.clone()),
1186                command.position_id,
1187                command.client_id,
1188                true,
1189            ) {
1190                log::error!("Failed to add order: {e}");
1191            }
1192
1193            msgbus::publish_order_event(
1194                format!("events.order.{}", order.strategy_id()).into(),
1195                transformed.last_event(),
1196            );
1197
1198            let ts_now = self.clock.borrow().timestamp_ns();
1199            let event = OrderReleased::new(
1200                order.trader_id(),
1201                order.strategy_id(),
1202                order.instrument_id(),
1203                order.client_order_id(),
1204                released_price,
1205                UUID4::new(),
1206                ts_now,
1207                ts_now,
1208            );
1209
1210            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1211                log::error!("Failed to apply order event: {e}");
1212            }
1213
1214            if let Err(e) = self
1215                .cache
1216                .borrow_mut()
1217                .update_order(&OrderAny::Market(transformed))
1218            {
1219                log::error!("Failed to update order: {e}");
1220            }
1221            self.manager.send_risk_event(OrderEventAny::Released(event));
1222
1223            log::info!("Releasing order {}", order.client_order_id());
1224
1225            // Publish event
1226            msgbus::publish_order_event(
1227                format!("events.order.{}", order.strategy_id()).into(),
1228                &OrderEventAny::Released(event),
1229            );
1230
1231            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1232                self.manager.send_algo_command(command, exec_algorithm_id);
1233            } else {
1234                self.manager
1235                    .send_exec_command(TradingCommand::SubmitOrder(command));
1236            }
1237        }
1238    }
1239
1240    #[allow(clippy::too_many_lines)]
1241    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1242        let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1243            log::error!(
1244                "Cannot update trailing-stop order: no matching core for instrument {}",
1245                order.instrument_id()
1246            );
1247            return;
1248        };
1249
1250        let mut bid = matching_core.bid;
1251        let mut ask = matching_core.ask;
1252        let mut last = matching_core.last;
1253
1254        if bid.is_none() || ask.is_none() || last.is_none() {
1255            if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1256                bid.get_or_insert(q.bid_price);
1257                ask.get_or_insert(q.ask_price);
1258            }
1259            if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1260                last.get_or_insert(t.price);
1261            }
1262        }
1263
1264        let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1265            matching_core.price_increment,
1266            order.trigger_price(),
1267            order.activation_price(),
1268            order,
1269            bid,
1270            ask,
1271            last,
1272        ) {
1273            Ok(pair) => pair,
1274            Err(e) => {
1275                log::warn!("Cannot calculate trailing-stop update: {e}");
1276                return;
1277            }
1278        };
1279
1280        if new_trigger_px.is_none() && new_limit_px.is_none() {
1281            return;
1282        }
1283
1284        let ts_now = self.clock.borrow().timestamp_ns();
1285        let update = OrderUpdated::new(
1286            order.trader_id(),
1287            order.strategy_id(),
1288            order.instrument_id(),
1289            order.client_order_id(),
1290            order.quantity(),
1291            UUID4::new(),
1292            ts_now,
1293            ts_now,
1294            false,
1295            order.venue_order_id(),
1296            order.account_id(),
1297            new_limit_px,
1298            new_trigger_px,
1299            None,
1300        );
1301        let wrapped = OrderEventAny::Updated(update);
1302        if let Err(e) = order.apply(wrapped.clone()) {
1303            log::error!("Failed to apply order event: {e}");
1304            return;
1305        }
1306        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1307            log::error!("Failed to update order in cache: {e}");
1308            return;
1309        }
1310        self.manager.send_risk_event(wrapped);
1311    }
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316    use std::{cell::RefCell, rc::Rc};
1317
1318    use nautilus_common::{cache::Cache, clock::TestClock};
1319    use nautilus_core::{UUID4, WeakCell};
1320    use nautilus_model::{
1321        data::{QuoteTick, TradeTick},
1322        enums::{AggressorSide, OrderSide, OrderType, TriggerType},
1323        identifiers::{StrategyId, TradeId, TraderId},
1324        instruments::{
1325            CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1326        },
1327        orders::OrderTestBuilder,
1328        types::{Price, Quantity},
1329    };
1330    use rstest::{fixture, rstest};
1331
1332    use super::*;
1333
1334    #[fixture]
1335    fn instrument() -> CryptoPerpetual {
1336        crypto_perpetual_ethusdt()
1337    }
1338
1339    #[allow(clippy::type_complexity)]
1340    fn create_emulator() -> (
1341        Rc<RefCell<dyn Clock>>,
1342        Rc<RefCell<Cache>>,
1343        Rc<RefCell<OrderEmulator>>,
1344    ) {
1345        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1346        let cache = Rc::new(RefCell::new(Cache::new(None, None)));
1347        let emulator = Rc::new(RefCell::new(OrderEmulator::new(
1348            clock.clone(),
1349            cache.clone(),
1350        )));
1351
1352        // Register with trader for subscription support
1353        emulator
1354            .borrow_mut()
1355            .register(TraderId::from("TRADER-001"), clock.clone(), cache.clone())
1356            .unwrap();
1357
1358        // Set self-ref for subscription handlers
1359        let self_ref = WeakCell::from(Rc::downgrade(&emulator));
1360        emulator.borrow_mut().set_self_ref(self_ref);
1361
1362        (clock, cache, emulator)
1363    }
1364
1365    fn create_stop_market_order(instrument: &CryptoPerpetual, trigger: TriggerType) -> OrderAny {
1366        OrderTestBuilder::new(OrderType::StopMarket)
1367            .instrument_id(instrument.id())
1368            .side(OrderSide::Buy)
1369            .trigger_price(Price::from("5100.00"))
1370            .quantity(Quantity::from(1))
1371            .emulation_trigger(trigger)
1372            .build()
1373    }
1374
1375    fn create_submit_order(instrument: &CryptoPerpetual, order: &OrderAny) -> SubmitOrder {
1376        SubmitOrder::new(
1377            TraderId::from("TRADER-001"),
1378            None,
1379            StrategyId::from("STRATEGY-001"),
1380            instrument.id(),
1381            order.client_order_id(),
1382            order.init_event().clone(),
1383            None,
1384            None,
1385            None,
1386            UUID4::new(),
1387            0.into(),
1388        )
1389    }
1390
1391    fn create_quote_tick(instrument: &CryptoPerpetual, bid: &str, ask: &str) -> QuoteTick {
1392        QuoteTick::new(
1393            instrument.id(),
1394            Price::from(bid),
1395            Price::from(ask),
1396            Quantity::from(10),
1397            Quantity::from(10),
1398            0.into(),
1399            0.into(),
1400        )
1401    }
1402
1403    fn create_trade_tick(instrument: &CryptoPerpetual, price: &str) -> TradeTick {
1404        TradeTick::new(
1405            instrument.id(),
1406            Price::from(price),
1407            Quantity::from(1),
1408            AggressorSide::Buyer,
1409            TradeId::from("T-001"),
1410            0.into(),
1411            0.into(),
1412        )
1413    }
1414
1415    fn add_instrument_to_cache(cache: &Rc<RefCell<Cache>>, instrument: &CryptoPerpetual) {
1416        cache
1417            .borrow_mut()
1418            .add_instrument(InstrumentAny::CryptoPerpetual(*instrument))
1419            .unwrap();
1420    }
1421
1422    #[rstest]
1423    fn test_subscribed_quotes_initially_empty() {
1424        let (_clock, _cache, emulator) = create_emulator();
1425
1426        assert!(emulator.borrow().subscribed_quotes().is_empty());
1427    }
1428
1429    #[rstest]
1430    fn test_subscribed_trades_initially_empty() {
1431        let (_clock, _cache, emulator) = create_emulator();
1432
1433        assert!(emulator.borrow().subscribed_trades().is_empty());
1434    }
1435
1436    #[rstest]
1437    fn test_get_submit_order_commands_initially_empty() {
1438        let (_clock, _cache, emulator) = create_emulator();
1439
1440        assert!(emulator.borrow().get_submit_order_commands().is_empty());
1441    }
1442
1443    #[rstest]
1444    fn test_get_matching_core_returns_none_when_not_created(instrument: CryptoPerpetual) {
1445        let (_clock, _cache, emulator) = create_emulator();
1446
1447        assert!(
1448            emulator
1449                .borrow()
1450                .get_matching_core(&instrument.id())
1451                .is_none()
1452        );
1453    }
1454
1455    #[rstest]
1456    fn test_create_matching_core(instrument: CryptoPerpetual) {
1457        let (_clock, _cache, emulator) = create_emulator();
1458
1459        emulator
1460            .borrow_mut()
1461            .create_matching_core(instrument.id(), instrument.price_increment);
1462
1463        assert!(
1464            emulator
1465                .borrow()
1466                .get_matching_core(&instrument.id())
1467                .is_some()
1468        );
1469    }
1470
1471    #[rstest]
1472    fn test_on_quote_tick_no_matching_core_does_not_panic(instrument: CryptoPerpetual) {
1473        let (_clock, _cache, emulator) = create_emulator();
1474        let quote = create_quote_tick(&instrument, "5060.00", "5070.00");
1475
1476        emulator.borrow_mut().on_quote_tick(quote);
1477    }
1478
1479    #[rstest]
1480    fn test_on_trade_tick_no_matching_core_does_not_panic(instrument: CryptoPerpetual) {
1481        let (_clock, _cache, emulator) = create_emulator();
1482        let trade = create_trade_tick(&instrument, "5065.00");
1483
1484        emulator.borrow_mut().on_trade_tick(trade);
1485    }
1486
1487    #[rstest]
1488    fn test_submit_order_bid_ask_trigger_creates_matching_core(instrument: CryptoPerpetual) {
1489        let (_clock, cache, emulator) = create_emulator();
1490        add_instrument_to_cache(&cache, &instrument);
1491        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1492        let command = create_submit_order(&instrument, &order);
1493        cache
1494            .borrow_mut()
1495            .add_order(order, None, None, false)
1496            .unwrap();
1497
1498        emulator
1499            .borrow_mut()
1500            .cache_submit_order_command(command.clone());
1501        emulator.borrow_mut().handle_submit_order(command);
1502
1503        assert!(
1504            emulator
1505                .borrow()
1506                .get_matching_core(&instrument.id())
1507                .is_some()
1508        );
1509    }
1510
1511    #[rstest]
1512    fn test_submit_order_bid_ask_trigger_tracks_quote_subscription(instrument: CryptoPerpetual) {
1513        let (_clock, cache, emulator) = create_emulator();
1514        add_instrument_to_cache(&cache, &instrument);
1515        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1516        let command = create_submit_order(&instrument, &order);
1517        cache
1518            .borrow_mut()
1519            .add_order(order, None, None, false)
1520            .unwrap();
1521
1522        emulator
1523            .borrow_mut()
1524            .cache_submit_order_command(command.clone());
1525        emulator.borrow_mut().handle_submit_order(command);
1526
1527        assert_eq!(emulator.borrow().subscribed_quotes(), vec![instrument.id()]);
1528        assert!(emulator.borrow().subscribed_trades().is_empty());
1529    }
1530
1531    #[rstest]
1532    fn test_submit_order_last_price_trigger_tracks_trade_subscription(instrument: CryptoPerpetual) {
1533        let (_clock, cache, emulator) = create_emulator();
1534        add_instrument_to_cache(&cache, &instrument);
1535        let order = create_stop_market_order(&instrument, TriggerType::LastPrice);
1536        let command = create_submit_order(&instrument, &order);
1537        cache
1538            .borrow_mut()
1539            .add_order(order, None, None, false)
1540            .unwrap();
1541
1542        emulator
1543            .borrow_mut()
1544            .cache_submit_order_command(command.clone());
1545        emulator.borrow_mut().handle_submit_order(command);
1546
1547        assert!(emulator.borrow().subscribed_quotes().is_empty());
1548        assert_eq!(emulator.borrow().subscribed_trades(), vec![instrument.id()]);
1549    }
1550
1551    #[rstest]
1552    fn test_submit_order_caches_command(instrument: CryptoPerpetual) {
1553        let (_clock, cache, emulator) = create_emulator();
1554        add_instrument_to_cache(&cache, &instrument);
1555        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1556        let client_order_id = order.client_order_id();
1557        let command = create_submit_order(&instrument, &order);
1558        cache
1559            .borrow_mut()
1560            .add_order(order, None, None, false)
1561            .unwrap();
1562
1563        emulator
1564            .borrow_mut()
1565            .cache_submit_order_command(command.clone());
1566        emulator.borrow_mut().handle_submit_order(command);
1567
1568        let commands = emulator.borrow().get_submit_order_commands();
1569        assert!(commands.contains_key(&client_order_id));
1570    }
1571
1572    #[rstest]
1573    fn test_quote_tick_updates_matching_core_prices(instrument: CryptoPerpetual) {
1574        let (_clock, cache, emulator) = create_emulator();
1575        add_instrument_to_cache(&cache, &instrument);
1576        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1577        let command = create_submit_order(&instrument, &order);
1578        cache
1579            .borrow_mut()
1580            .add_order(order, None, None, false)
1581            .unwrap();
1582        emulator
1583            .borrow_mut()
1584            .cache_submit_order_command(command.clone());
1585        emulator.borrow_mut().handle_submit_order(command);
1586
1587        let quote = create_quote_tick(&instrument, "5060.00", "5070.00");
1588        emulator.borrow_mut().on_quote_tick(quote);
1589
1590        let core = emulator
1591            .borrow()
1592            .get_matching_core(&instrument.id())
1593            .unwrap();
1594        assert_eq!(core.bid, Some(Price::from("5060.00")));
1595        assert_eq!(core.ask, Some(Price::from("5070.00")));
1596    }
1597
1598    #[rstest]
1599    fn test_trade_tick_updates_matching_core_last_price(instrument: CryptoPerpetual) {
1600        let (_clock, cache, emulator) = create_emulator();
1601        add_instrument_to_cache(&cache, &instrument);
1602        let order = create_stop_market_order(&instrument, TriggerType::LastPrice);
1603        let command = create_submit_order(&instrument, &order);
1604        cache
1605            .borrow_mut()
1606            .add_order(order, None, None, false)
1607            .unwrap();
1608        emulator
1609            .borrow_mut()
1610            .cache_submit_order_command(command.clone());
1611        emulator.borrow_mut().handle_submit_order(command);
1612
1613        let trade = create_trade_tick(&instrument, "5065.00");
1614        emulator.borrow_mut().on_trade_tick(trade);
1615
1616        let core = emulator
1617            .borrow()
1618            .get_matching_core(&instrument.id())
1619            .unwrap();
1620        assert_eq!(core.last, Some(Price::from("5065.00")));
1621    }
1622
1623    #[rstest]
1624    fn test_cancel_order_removes_from_matching_core(instrument: CryptoPerpetual) {
1625        let (_clock, cache, emulator) = create_emulator();
1626        add_instrument_to_cache(&cache, &instrument);
1627        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1628        let command = create_submit_order(&instrument, &order);
1629        cache
1630            .borrow_mut()
1631            .add_order(order.clone(), None, None, false)
1632            .unwrap();
1633        emulator
1634            .borrow_mut()
1635            .cache_submit_order_command(command.clone());
1636        emulator.borrow_mut().handle_submit_order(command);
1637
1638        emulator.borrow_mut().cancel_order(&order);
1639
1640        let core = emulator
1641            .borrow()
1642            .get_matching_core(&instrument.id())
1643            .unwrap();
1644        assert!(core.get_orders().is_empty());
1645    }
1646
1647    #[rstest]
1648    fn test_cancel_order_removes_cached_command(instrument: CryptoPerpetual) {
1649        let (_clock, cache, emulator) = create_emulator();
1650        add_instrument_to_cache(&cache, &instrument);
1651        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1652        let client_order_id = order.client_order_id();
1653        let command = create_submit_order(&instrument, &order);
1654        cache
1655            .borrow_mut()
1656            .add_order(order.clone(), None, None, false)
1657            .unwrap();
1658        emulator
1659            .borrow_mut()
1660            .cache_submit_order_command(command.clone());
1661        emulator.borrow_mut().handle_submit_order(command);
1662
1663        emulator.borrow_mut().cancel_order(&order);
1664
1665        let commands = emulator.borrow().get_submit_order_commands();
1666        assert!(!commands.contains_key(&client_order_id));
1667    }
1668}