nautilus_execution/order_emulator/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::RefCell, fmt::Debug, rc::Rc};
17
18use ahash::{AHashMap, AHashSet};
19use nautilus_common::{
20    cache::Cache,
21    clock::Clock,
22    logging::{CMD, EVT, RECV},
23    messages::execution::{
24        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
25    },
26    msgbus::{self, handler::ShareableMessageHandler},
27};
28use nautilus_core::UUID4;
29use nautilus_model::{
30    data::{OrderBookDeltas, QuoteTick, TradeTick},
31    enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
32    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
33    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
34    instruments::Instrument,
35    orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
36    types::{Price, Quantity},
37};
38
39use crate::{
40    matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
41    trailing::trailing_stop_calculate,
42};
43
44pub struct OrderEmulator {
45    clock: Rc<RefCell<dyn Clock>>,
46    cache: Rc<RefCell<Cache>>,
47    manager: OrderManager,
48    matching_cores: AHashMap<InstrumentId, OrderMatchingCore>,
49    subscribed_quotes: AHashSet<InstrumentId>,
50    subscribed_trades: AHashSet<InstrumentId>,
51    subscribed_strategies: AHashSet<StrategyId>,
52    monitored_positions: AHashSet<PositionId>,
53    on_event_handler: Option<ShareableMessageHandler>,
54}
55
56impl Debug for OrderEmulator {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct(stringify!(OrderEmulator))
59            .field("cores", &self.matching_cores.len())
60            .field("subscribed_quotes", &self.subscribed_quotes.len())
61            .finish()
62    }
63}
64
65impl OrderEmulator {
66    pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
67        // TODO: Impl Actor Trait
68        // self.register_base(portfolio, msgbus, cache, clock);
69
70        let active_local = true;
71        let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
72
73        Self {
74            clock,
75            cache,
76            manager,
77            matching_cores: AHashMap::new(),
78            subscribed_quotes: AHashSet::new(),
79            subscribed_trades: AHashSet::new(),
80            subscribed_strategies: AHashSet::new(),
81            monitored_positions: AHashSet::new(),
82            on_event_handler: None,
83        }
84    }
85
86    pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
87        self.on_event_handler = Some(handler);
88    }
89
90    // TODO: WIP
91    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
92    //     self.manager.set_submit_order_handler(handler);
93    // }
94    //
95    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
96    //     self.manager.set_cancel_order_handler(handler);
97    // }
98    //
99    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
100    //     self.manager.set_modify_order_handler(handler);
101    // }
102
103    #[must_use]
104    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
105        let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
106        quotes.sort();
107        quotes
108    }
109
110    #[must_use]
111    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
112        let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
113        trades.sort();
114        trades
115    }
116
117    #[must_use]
118    pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
119        self.manager.get_submit_order_commands()
120    }
121
122    #[must_use]
123    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
124        self.matching_cores.get(instrument_id).cloned()
125    }
126
127    /// Reactivates emulated orders from cache on start.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if no emulated orders are found or processing fails.
132    ///
133    /// # Panics
134    ///
135    /// Panics if a cached client ID cannot be unwrapped.
136    pub fn on_start(&mut self) -> anyhow::Result<()> {
137        let emulated_orders: Vec<OrderAny> = self
138            .cache
139            .borrow()
140            .orders_emulated(None, None, None, None)
141            .into_iter()
142            .cloned()
143            .collect();
144
145        if emulated_orders.is_empty() {
146            log::error!("No emulated orders to reactivate");
147            return Ok(());
148        }
149
150        for order in emulated_orders {
151            if !matches!(
152                order.status(),
153                OrderStatus::Initialized | OrderStatus::Emulated
154            ) {
155                continue; // No longer emulated
156            }
157
158            if let Some(parent_order_id) = &order.parent_order_id() {
159                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
160                    order.clone()
161                } else {
162                    log::error!("Cannot handle order: parent {parent_order_id} not found");
163                    continue;
164                };
165
166                let is_position_closed = parent_order
167                    .position_id()
168                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
169                if parent_order.is_closed() && is_position_closed {
170                    self.manager.cancel_order(&order);
171                    continue; // Parent already closed
172                }
173
174                if parent_order.contingency_type() == Some(ContingencyType::Oto)
175                    && (parent_order.is_active_local()
176                        || parent_order.filled_qty() == Quantity::zero(0))
177                {
178                    continue; // Process contingency order later once parent triggered
179                }
180            }
181
182            let position_id = self
183                .cache
184                .borrow()
185                .position_id(&order.client_order_id())
186                .copied();
187            let client_id = self
188                .cache
189                .borrow()
190                .client_id(&order.client_order_id())
191                .copied();
192
193            let command = SubmitOrder::new(
194                order.trader_id(),
195                client_id,
196                order.strategy_id(),
197                order.instrument_id(),
198                order.clone(),
199                order.exec_algorithm_id(),
200                position_id,
201                None, // params
202                UUID4::new(),
203                self.clock.borrow().timestamp_ns(),
204            );
205
206            self.handle_submit_order(command);
207        }
208
209        Ok(())
210    }
211
212    /// # Panics
213    ///
214    /// Panics if the order cannot be converted to a passive order.
215    pub fn on_event(&mut self, event: OrderEventAny) {
216        log::info!("{RECV}{EVT} {event}");
217
218        self.manager.handle_event(event.clone());
219
220        if let Some(order) = self.cache.borrow().order(&event.client_order_id())
221            && order.is_closed()
222            && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
223            && let Err(e) = matching_core.delete_order(
224                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
225            )
226        {
227            log::error!("Error deleting order: {e}");
228        }
229        // else: Order not in cache yet
230    }
231
232    pub const fn on_stop(&self) {}
233
234    pub fn on_reset(&mut self) {
235        self.manager.reset();
236        self.matching_cores.clear();
237    }
238
239    pub const fn on_dispose(&self) {}
240
241    pub fn execute(&mut self, command: TradingCommand) {
242        log::info!("{RECV}{CMD} {command}");
243
244        match command {
245            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
246            TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
247            TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
248            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
249            TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
250            _ => log::error!("Cannot handle command: unrecognized {command:?}"),
251        }
252    }
253
254    fn create_matching_core(
255        &mut self,
256        instrument_id: InstrumentId,
257        price_increment: Price,
258    ) -> OrderMatchingCore {
259        let matching_core =
260            OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
261        self.matching_cores
262            .insert(instrument_id, matching_core.clone());
263        log::info!("Creating matching core for {instrument_id:?}");
264        matching_core
265    }
266
267    /// # Panics
268    ///
269    /// Panics if the emulation trigger type is `NoTrigger`.
270    pub fn handle_submit_order(&mut self, command: SubmitOrder) {
271        let mut order = command.order.clone();
272        let emulation_trigger = order.emulation_trigger();
273
274        assert_ne!(
275            emulation_trigger,
276            Some(TriggerType::NoTrigger),
277            "command.order.emulation_trigger must not be TriggerType::NoTrigger"
278        );
279        assert!(
280            self.manager
281                .get_submit_order_commands()
282                .contains_key(&order.client_order_id()),
283            "command.order.client_order_id must be in submit_order_commands"
284        );
285
286        if !matches!(
287            emulation_trigger,
288            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
289        ) {
290            log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
291            self.manager.cancel_order(&order);
292            return;
293        }
294
295        self.check_monitoring(command.strategy_id, command.position_id);
296
297        // Get or create matching core
298        let trigger_instrument_id = order
299            .trigger_instrument_id()
300            .unwrap_or_else(|| order.instrument_id());
301
302        let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
303
304        let mut matching_core = if let Some(core) = matching_core {
305            core
306        } else {
307            // Handle synthetic instruments
308            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
309                let synthetic = self
310                    .cache
311                    .borrow()
312                    .synthetic(&trigger_instrument_id)
313                    .cloned();
314                if let Some(synthetic) = synthetic {
315                    (synthetic.id, synthetic.price_increment)
316                } else {
317                    log::error!(
318                        "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
319                    );
320                    self.manager.cancel_order(&order);
321                    return;
322                }
323            } else {
324                let instrument = self
325                    .cache
326                    .borrow()
327                    .instrument(&trigger_instrument_id)
328                    .cloned();
329                if let Some(instrument) = instrument {
330                    (instrument.id(), instrument.price_increment())
331                } else {
332                    log::error!(
333                        "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
334                    );
335                    self.manager.cancel_order(&order);
336                    return;
337                }
338            };
339
340            self.create_matching_core(instrument_id, price_increment)
341        };
342
343        // Update trailing stop
344        if matches!(
345            order.order_type(),
346            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
347        ) {
348            self.update_trailing_stop_order(&mut order);
349            if order.trigger_price().is_none() {
350                log::error!(
351                    "Cannot handle trailing stop order with no trigger_price and no market updates"
352                );
353
354                self.manager.cancel_order(&order);
355                return;
356            }
357        }
358
359        // Cache command
360        self.manager.cache_submit_order_command(command);
361
362        // Check if immediately marketable
363        matching_core.match_order(
364            &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
365            true,
366        );
367
368        // Handle data subscriptions
369        match emulation_trigger.unwrap() {
370            TriggerType::Default | TriggerType::BidAsk => {
371                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
372                    if !trigger_instrument_id.is_synthetic() {
373                        // TODO: Impl Actor Trait
374                        // self.subscribe_order_book_deltas(&trigger_instrument_id);
375                    }
376                    // TODO: Impl Actor Trait
377                    // self.subscribe_quote_ticks(&trigger_instrument_id)?;
378                    self.subscribed_quotes.insert(trigger_instrument_id);
379                }
380            }
381            TriggerType::LastPrice => {
382                if !self.subscribed_trades.contains(&trigger_instrument_id) {
383                    // TODO: Impl Actor Trait
384                    // self.subscribe_trade_ticks(&trigger_instrument_id)?;
385                    self.subscribed_trades.insert(trigger_instrument_id);
386                }
387            }
388            _ => {
389                log::error!("Invalid TriggerType: {emulation_trigger:?}");
390                return;
391            }
392        }
393
394        // Check if order was already released
395        if !self
396            .manager
397            .get_submit_order_commands()
398            .contains_key(&order.client_order_id())
399        {
400            return; // Already released
401        }
402
403        // Hold in matching core
404        if let Err(e) = matching_core
405            .add_order(PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"))
406        {
407            log::error!("Cannot add order: {e:?}");
408            return;
409        }
410
411        // Generate emulated event if needed
412        if order.status() == OrderStatus::Initialized {
413            let event = OrderEmulated::new(
414                order.trader_id(),
415                order.strategy_id(),
416                order.instrument_id(),
417                order.client_order_id(),
418                UUID4::new(),
419                self.clock.borrow().timestamp_ns(),
420                self.clock.borrow().timestamp_ns(),
421            );
422
423            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
424                log::error!("Cannot apply order event: {e:?}");
425                return;
426            }
427
428            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
429                log::error!("Cannot update order: {e:?}");
430                return;
431            }
432
433            self.manager.send_risk_event(OrderEventAny::Emulated(event));
434
435            msgbus::publish(
436                format!("events.order.{}", order.strategy_id()).into(),
437                &OrderEventAny::Emulated(event),
438            );
439        }
440
441        // Since we are cloning the matching core, we need to insert it back into the original hashmap
442        self.matching_cores
443            .insert(trigger_instrument_id, matching_core);
444
445        log::info!("Emulating {order}");
446    }
447
448    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
449        self.check_monitoring(command.strategy_id, command.position_id);
450
451        for order in &command.order_list.orders {
452            if let Some(parent_order_id) = order.parent_order_id() {
453                let cache = self.cache.borrow();
454                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
455                    parent_order
456                } else {
457                    log::error!("Parent order for {} not found", order.client_order_id());
458                    continue;
459                };
460
461                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
462                    continue; // Process contingency order later once parent triggered
463                }
464            }
465
466            if let Err(e) =
467                self.manager
468                    .create_new_submit_order(order, command.position_id, command.client_id)
469            {
470                log::error!("Error creating new submit order: {e}");
471            }
472        }
473    }
474
475    fn handle_modify_order(&mut self, command: ModifyOrder) {
476        if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
477            let price = match command.price {
478                Some(price) => Some(price),
479                None => order.price(),
480            };
481
482            let trigger_price = match command.trigger_price {
483                Some(trigger_price) => Some(trigger_price),
484                None => order.trigger_price(),
485            };
486
487            // Generate event
488            let ts_now = self.clock.borrow().timestamp_ns();
489            let event = OrderUpdated::new(
490                order.trader_id(),
491                order.strategy_id(),
492                order.instrument_id(),
493                order.client_order_id(),
494                command.quantity.unwrap_or(order.quantity()),
495                UUID4::new(),
496                ts_now,
497                ts_now,
498                false,
499                order.venue_order_id(),
500                order.account_id(),
501                price,
502                trigger_price,
503                None,
504            );
505
506            self.manager.send_exec_event(OrderEventAny::Updated(event));
507
508            let trigger_instrument_id = order
509                .trigger_instrument_id()
510                .unwrap_or_else(|| order.instrument_id());
511
512            if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
513                matching_core.match_order(
514                    &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
515                    false,
516                );
517            } else {
518                log::error!(
519                    "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
520                );
521            }
522        } else {
523            log::error!("Cannot modify order: {} not found", command.client_order_id);
524        }
525    }
526
527    pub fn handle_cancel_order(&mut self, command: CancelOrder) {
528        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
529            order.clone()
530        } else {
531            log::error!("Cannot cancel order: {} not found", command.client_order_id);
532            return;
533        };
534
535        let trigger_instrument_id = order
536            .trigger_instrument_id()
537            .unwrap_or_else(|| order.instrument_id());
538
539        let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
540            core
541        } else {
542            self.manager.cancel_order(&order);
543            return;
544        };
545
546        if !matching_core.order_exists(order.client_order_id())
547            && order.is_open()
548            && !order.is_pending_cancel()
549        {
550            // Order not held in the emulator
551            self.manager
552                .send_exec_command(TradingCommand::CancelOrder(command));
553        } else {
554            self.manager.cancel_order(&order);
555        }
556    }
557
558    fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
559        let instrument_id = command.instrument_id;
560        let matching_core = match self.matching_cores.get(&instrument_id) {
561            Some(core) => core,
562            None => return, // No orders to cancel
563        };
564
565        let orders_to_cancel = match command.order_side {
566            OrderSide::NoOrderSide => {
567                // Get both bid and ask orders
568                let mut all_orders = Vec::new();
569                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
570                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
571                all_orders
572            }
573            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
574            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
575        };
576
577        // Process all orders in a single iteration
578        for order in orders_to_cancel {
579            self.manager.cancel_order(&OrderAny::from(order));
580        }
581    }
582
583    // --------------------------------------------------------------------------------------------
584
585    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
586        log::info!(
587            "Updating order {} quantity to {new_quantity}",
588            order.client_order_id(),
589        );
590
591        // Generate event
592        let ts_now = self.clock.borrow().timestamp_ns();
593        let event = OrderUpdated::new(
594            order.trader_id(),
595            order.strategy_id(),
596            order.instrument_id(),
597            order.client_order_id(),
598            new_quantity,
599            UUID4::new(),
600            ts_now,
601            ts_now,
602            false,
603            None,
604            order.account_id(),
605            None,
606            None,
607            None,
608        );
609
610        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
611            log::error!("Cannot apply order event: {e:?}");
612            return;
613        }
614        if let Err(e) = self.cache.borrow_mut().update_order(order) {
615            log::error!("Cannot update order: {e:?}");
616            return;
617        }
618
619        self.manager.send_risk_event(OrderEventAny::Updated(event));
620    }
621
622    // -----------------------------------------------------------------------------------------------
623
624    pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
625        log::debug!("Processing {deltas:?}");
626
627        let instrument_id = &deltas.instrument_id;
628        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
629            if let Some(book) = self.cache.borrow().order_book(instrument_id) {
630                let best_bid = book.best_bid_price();
631                let best_ask = book.best_ask_price();
632
633                if let Some(best_bid) = best_bid {
634                    matching_core.set_bid_raw(best_bid);
635                }
636
637                if let Some(best_ask) = best_ask {
638                    matching_core.set_ask_raw(best_ask);
639                }
640            } else {
641                log::error!(
642                    "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
643                    deltas.instrument_id
644                );
645            }
646
647            self.iterate_orders(instrument_id);
648        } else {
649            log::error!(
650                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
651                deltas.instrument_id
652            );
653        }
654    }
655
656    pub fn on_quote_tick(&mut self, quote: QuoteTick) {
657        log::debug!("Processing {quote}:?");
658
659        let instrument_id = &quote.instrument_id;
660        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
661            matching_core.set_bid_raw(quote.bid_price);
662            matching_core.set_ask_raw(quote.ask_price);
663
664            self.iterate_orders(instrument_id);
665        } else {
666            log::error!(
667                "Cannot handle `QuoteTick`: no matching core for instrument {}",
668                quote.instrument_id
669            );
670        }
671    }
672
673    pub fn on_trade_tick(&mut self, trade: TradeTick) {
674        log::debug!("Processing {trade:?}");
675
676        let instrument_id = &trade.instrument_id;
677        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
678            matching_core.set_last_raw(trade.price);
679            if !self.subscribed_quotes.contains(instrument_id) {
680                matching_core.set_bid_raw(trade.price);
681                matching_core.set_ask_raw(trade.price);
682            }
683
684            self.iterate_orders(instrument_id);
685        } else {
686            log::error!(
687                "Cannot handle `TradeTick`: no matching core for instrument {}",
688                trade.instrument_id
689            );
690        }
691    }
692
693    fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
694        let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
695            matching_core.iterate();
696
697            matching_core.get_orders()
698        } else {
699            log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
700            return;
701        };
702
703        for order in orders {
704            if order.is_closed() {
705                continue;
706            }
707
708            let mut order: OrderAny = order.clone().into();
709            if matches!(
710                order.order_type(),
711                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
712            ) {
713                self.update_trailing_stop_order(&mut order);
714            }
715        }
716    }
717
718    /// # Panics
719    ///
720    /// Panics if the order cannot be converted to a passive order.
721    pub fn cancel_order(&mut self, order: &OrderAny) {
722        log::info!("Canceling order {}", order.client_order_id());
723
724        let mut order = order.clone();
725        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
726
727        let trigger_instrument_id = order
728            .trigger_instrument_id()
729            .unwrap_or(order.instrument_id());
730
731        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
732            && let Err(e) = matching_core.delete_order(
733                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
734            )
735        {
736            log::error!("Cannot delete order: {e:?}");
737        }
738
739        self.cache
740            .borrow_mut()
741            .update_order_pending_cancel_local(&order);
742
743        // Generate event
744        let ts_now = self.clock.borrow().timestamp_ns();
745        let event = OrderCanceled::new(
746            order.trader_id(),
747            order.strategy_id(),
748            order.instrument_id(),
749            order.client_order_id(),
750            UUID4::new(),
751            ts_now,
752            ts_now,
753            false,
754            order.venue_order_id(),
755            order.account_id(),
756        );
757
758        self.manager.send_exec_event(OrderEventAny::Canceled(event));
759    }
760
761    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
762        if !self.subscribed_strategies.contains(&strategy_id) {
763            // Subscribe to all strategy events
764            if let Some(handler) = &self.on_event_handler {
765                msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
766                msgbus::subscribe_str(
767                    format!("events.position.{strategy_id}"),
768                    handler.clone(),
769                    None,
770                );
771                self.subscribed_strategies.insert(strategy_id);
772                log::info!("Subscribed to strategy {strategy_id} order and position events");
773            }
774        }
775
776        if let Some(position_id) = position_id
777            && !self.monitored_positions.contains(&position_id)
778        {
779            self.monitored_positions.insert(position_id);
780        }
781    }
782
783    /// Validates market data availability for order release.
784    ///
785    /// Returns `Some(released_price)` if market data is available, `None` otherwise.
786    /// Logs appropriate warnings when market data is not yet available.
787    ///
788    /// Does NOT pop the submit order command - caller must do that and handle missing command
789    /// according to their contract (panic for market orders, return for limit orders).
790    fn validate_release(
791        &self,
792        order: &OrderAny,
793        matching_core: &OrderMatchingCore,
794        trigger_instrument_id: InstrumentId,
795    ) -> Option<Price> {
796        let released_price = match order.order_side_specified() {
797            OrderSideSpecified::Buy => matching_core.ask,
798            OrderSideSpecified::Sell => matching_core.bid,
799        };
800
801        if released_price.is_none() {
802            log::warn!(
803                "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
804                order.client_order_id(),
805            );
806            return None;
807        }
808
809        Some(released_price.unwrap())
810    }
811
812    /// # Panics
813    ///
814    /// Panics if the order type is invalid for a stop order.
815    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
816        match order.order_type() {
817            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
818                self.fill_limit_order(order);
819            }
820            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
821                self.fill_market_order(order);
822            }
823            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
824        }
825    }
826
827    /// # Panics
828    ///
829    /// Panics if a limit order has no price.
830    pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
831        if matches!(order.order_type(), OrderType::Limit) {
832            self.fill_market_order(order);
833            return;
834        }
835
836        let trigger_instrument_id = order
837            .trigger_instrument_id()
838            .unwrap_or(order.instrument_id());
839
840        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
841            Some(core) => core,
842            None => {
843                log::error!(
844                    "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
845                );
846                return; // Order stays queued for retry
847            }
848        };
849
850        let released_price =
851            match self.validate_release(order, matching_core, trigger_instrument_id) {
852                Some(price) => price,
853                None => return, // Order stays queued for retry
854            };
855
856        let mut command = match self
857            .manager
858            .pop_submit_order_command(order.client_order_id())
859        {
860            Some(command) => command,
861            None => return, // Order already released
862        };
863
864        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
865            if let Err(e) = matching_core.delete_order(
866                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
867            ) {
868                log::error!("Error deleting order: {e:?}");
869            }
870
871            let emulation_trigger = TriggerType::NoTrigger;
872
873            // Transform order
874            let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
875                order.trader_id(),
876                order.strategy_id(),
877                order.instrument_id(),
878                order.client_order_id(),
879                order.order_side(),
880                order.quantity(),
881                order.price().unwrap(),
882                order.time_in_force(),
883                order.expire_time(),
884                order.is_post_only(),
885                order.is_reduce_only(),
886                order.is_quote_quantity(),
887                order.display_qty(),
888                Some(emulation_trigger),
889                Some(trigger_instrument_id),
890                order.contingency_type(),
891                order.order_list_id(),
892                order.linked_order_ids().map(Vec::from),
893                order.parent_order_id(),
894                order.exec_algorithm_id(),
895                order.exec_algorithm_params().cloned(),
896                order.exec_spawn_id(),
897                order.tags().map(Vec::from),
898                UUID4::new(),
899                self.clock.borrow().timestamp_ns(),
900            ) {
901                transformed
902            } else {
903                log::error!("Cannot create limit order");
904                return;
905            };
906            transformed.liquidity_side = order.liquidity_side();
907
908            // TODO: fix
909            // let triggered_price = order.trigger_price();
910            // if triggered_price.is_some() {
911            //     transformed.trigger_price() = (triggered_price.unwrap());
912            // }
913
914            let original_events = order.events();
915
916            for event in original_events {
917                transformed.events.insert(0, event.clone());
918            }
919
920            if let Err(e) = self.cache.borrow_mut().add_order(
921                OrderAny::Limit(transformed.clone()),
922                command.position_id,
923                command.client_id,
924                true,
925            ) {
926                log::error!("Failed to add order: {e}");
927            }
928
929            // Replace commands order with transformed order
930            command.order = OrderAny::Limit(transformed.clone());
931
932            msgbus::publish(
933                format!("events.order.{}", order.strategy_id()).into(),
934                transformed.last_event(),
935            );
936
937            // Generate event
938            let event = OrderReleased::new(
939                order.trader_id(),
940                order.strategy_id(),
941                order.instrument_id(),
942                order.client_order_id(),
943                released_price,
944                UUID4::new(),
945                self.clock.borrow().timestamp_ns(),
946                self.clock.borrow().timestamp_ns(),
947            );
948
949            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
950                log::error!("Failed to apply order event: {e}");
951            }
952
953            if let Err(e) = self
954                .cache
955                .borrow_mut()
956                .update_order(&OrderAny::Limit(transformed.clone()))
957            {
958                log::error!("Failed to update order: {e}");
959            }
960
961            self.manager.send_risk_event(OrderEventAny::Released(event));
962
963            log::info!("Releasing order {}", order.client_order_id());
964
965            // Publish event
966            msgbus::publish(
967                format!("events.order.{}", transformed.strategy_id()).into(),
968                &OrderEventAny::Released(event),
969            );
970
971            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
972                self.manager.send_algo_command(command, exec_algorithm_id);
973            } else {
974                self.manager
975                    .send_exec_command(TradingCommand::SubmitOrder(command));
976            }
977        }
978    }
979
980    /// # Panics
981    ///
982    /// Panics if a market order command is missing.
983    pub fn fill_market_order(&mut self, order: &mut OrderAny) {
984        let trigger_instrument_id = order
985            .trigger_instrument_id()
986            .unwrap_or(order.instrument_id());
987
988        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
989            Some(core) => core,
990            None => {
991                log::error!(
992                    "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
993                );
994                return; // Order stays queued for retry
995            }
996        };
997
998        let released_price =
999            match self.validate_release(order, matching_core, trigger_instrument_id) {
1000                Some(price) => price,
1001                None => return, // Order stays queued for retry
1002            };
1003
1004        let mut command = self
1005            .manager
1006            .pop_submit_order_command(order.client_order_id())
1007            .expect("invalid operation `fill_market_order` with no command");
1008
1009        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1010            if let Err(e) = matching_core.delete_order(
1011                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
1012            ) {
1013                log::error!("Cannot delete order: {e:?}");
1014            }
1015
1016            order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1017
1018            // Transform order
1019            let mut transformed = MarketOrder::new(
1020                order.trader_id(),
1021                order.strategy_id(),
1022                order.instrument_id(),
1023                order.client_order_id(),
1024                order.order_side(),
1025                order.quantity(),
1026                order.time_in_force(),
1027                UUID4::new(),
1028                self.clock.borrow().timestamp_ns(),
1029                order.is_reduce_only(),
1030                order.is_quote_quantity(),
1031                order.contingency_type(),
1032                order.order_list_id(),
1033                order.linked_order_ids().map(Vec::from),
1034                order.parent_order_id(),
1035                order.exec_algorithm_id(),
1036                order.exec_algorithm_params().cloned(),
1037                order.exec_spawn_id(),
1038                order.tags().map(Vec::from),
1039            );
1040
1041            let original_events = order.events();
1042
1043            for event in original_events {
1044                transformed.events.insert(0, event.clone());
1045            }
1046
1047            if let Err(e) = self.cache.borrow_mut().add_order(
1048                OrderAny::Market(transformed.clone()),
1049                command.position_id,
1050                command.client_id,
1051                true,
1052            ) {
1053                log::error!("Failed to add order: {e}");
1054            }
1055
1056            // Replace commands order with transformed order
1057            command.order = OrderAny::Market(transformed.clone());
1058
1059            msgbus::publish(
1060                format!("events.order.{}", order.strategy_id()).into(),
1061                transformed.last_event(),
1062            );
1063
1064            // Generate event
1065            let ts_now = self.clock.borrow().timestamp_ns();
1066            let event = OrderReleased::new(
1067                order.trader_id(),
1068                order.strategy_id(),
1069                order.instrument_id(),
1070                order.client_order_id(),
1071                released_price,
1072                UUID4::new(),
1073                ts_now,
1074                ts_now,
1075            );
1076
1077            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1078                log::error!("Failed to apply order event: {e}");
1079            }
1080
1081            if let Err(e) = self
1082                .cache
1083                .borrow_mut()
1084                .update_order(&OrderAny::Market(transformed))
1085            {
1086                log::error!("Failed to update order: {e}");
1087            }
1088            self.manager.send_risk_event(OrderEventAny::Released(event));
1089
1090            log::info!("Releasing order {}", order.client_order_id());
1091
1092            // Publish event
1093            msgbus::publish(
1094                format!("events.order.{}", order.strategy_id()).into(),
1095                &OrderEventAny::Released(event),
1096            );
1097
1098            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1099                self.manager.send_algo_command(command, exec_algorithm_id);
1100            } else {
1101                self.manager
1102                    .send_exec_command(TradingCommand::SubmitOrder(command));
1103            }
1104        }
1105    }
1106
1107    #[allow(clippy::too_many_lines)]
1108    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1109        let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1110            log::error!(
1111                "Cannot update trailing-stop order: no matching core for instrument {}",
1112                order.instrument_id()
1113            );
1114            return;
1115        };
1116
1117        let mut bid = matching_core.bid;
1118        let mut ask = matching_core.ask;
1119        let mut last = matching_core.last;
1120
1121        if bid.is_none() || ask.is_none() || last.is_none() {
1122            if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1123                bid.get_or_insert(q.bid_price);
1124                ask.get_or_insert(q.ask_price);
1125            }
1126            if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1127                last.get_or_insert(t.price);
1128            }
1129        }
1130
1131        let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1132            matching_core.price_increment,
1133            order.trigger_price(),
1134            order.activation_price(),
1135            order,
1136            bid,
1137            ask,
1138            last,
1139        ) {
1140            Ok(pair) => pair,
1141            Err(e) => {
1142                log::warn!("Cannot calculate trailing-stop update: {e}");
1143                return;
1144            }
1145        };
1146
1147        if new_trigger_px.is_none() && new_limit_px.is_none() {
1148            return;
1149        }
1150
1151        let ts_now = self.clock.borrow().timestamp_ns();
1152        let update = OrderUpdated::new(
1153            order.trader_id(),
1154            order.strategy_id(),
1155            order.instrument_id(),
1156            order.client_order_id(),
1157            order.quantity(),
1158            UUID4::new(),
1159            ts_now,
1160            ts_now,
1161            false,
1162            order.venue_order_id(),
1163            order.account_id(),
1164            new_limit_px,
1165            new_trigger_px,
1166            None,
1167        );
1168        let wrapped = OrderEventAny::Updated(update);
1169        if let Err(e) = order.apply(wrapped.clone()) {
1170            log::error!("Failed to apply order event: {e}");
1171            return;
1172        }
1173        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1174            log::error!("Failed to update order in cache: {e}");
1175            return;
1176        }
1177        self.manager.send_risk_event(wrapped);
1178    }
1179}