nautilus_execution/order_emulator/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    collections::{HashMap, HashSet},
19    fmt::Debug,
20    rc::Rc,
21};
22
23use nautilus_common::{
24    cache::Cache,
25    clock::Clock,
26    logging::{CMD, EVT, RECV},
27    messages::execution::{
28        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
29    },
30    msgbus::{self, handler::ShareableMessageHandler},
31};
32use nautilus_core::UUID4;
33use nautilus_model::{
34    data::{OrderBookDeltas, QuoteTick, TradeTick},
35    enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
36    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
37    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
38    instruments::Instrument,
39    orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
40    types::{Price, Quantity},
41};
42
43use crate::{
44    matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
45    trailing::trailing_stop_calculate,
46};
47
48pub struct OrderEmulator {
49    clock: Rc<RefCell<dyn Clock>>,
50    cache: Rc<RefCell<Cache>>,
51    manager: OrderManager,
52    matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
53    subscribed_quotes: HashSet<InstrumentId>,
54    subscribed_trades: HashSet<InstrumentId>,
55    subscribed_strategies: HashSet<StrategyId>,
56    monitored_positions: HashSet<PositionId>,
57    on_event_handler: Option<ShareableMessageHandler>,
58}
59
60impl Debug for OrderEmulator {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct(stringify!(OrderEmulator))
63            .field("cores", &self.matching_cores.len())
64            .field("subscribed_quotes", &self.subscribed_quotes.len())
65            .finish()
66    }
67}
68
69impl OrderEmulator {
70    pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
71        // TODO: Impl Actor Trait
72        // self.register_base(portfolio, msgbus, cache, clock);
73
74        let active_local = true;
75        let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
76
77        Self {
78            clock,
79            cache,
80            manager,
81            matching_cores: HashMap::new(),
82            subscribed_quotes: HashSet::new(),
83            subscribed_trades: HashSet::new(),
84            subscribed_strategies: HashSet::new(),
85            monitored_positions: HashSet::new(),
86            on_event_handler: None,
87        }
88    }
89
90    pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
91        self.on_event_handler = Some(handler);
92    }
93
94    // TODO: WIP
95    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
96    //     self.manager.set_submit_order_handler(handler);
97    // }
98    //
99    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
100    //     self.manager.set_cancel_order_handler(handler);
101    // }
102    //
103    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
104    //     self.manager.set_modify_order_handler(handler);
105    // }
106
107    #[must_use]
108    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
109        let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
110        quotes.sort();
111        quotes
112    }
113
114    #[must_use]
115    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
116        let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
117        trades.sort();
118        trades
119    }
120
121    #[must_use]
122    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
123        self.manager.get_submit_order_commands()
124    }
125
126    #[must_use]
127    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
128        self.matching_cores.get(instrument_id).cloned()
129    }
130
131    /// Reactivates emulated orders from cache on start.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if no emulated orders are found or processing fails.
136    ///
137    /// # Panics
138    ///
139    /// Panics if a cached client ID cannot be unwrapped.
140    pub fn on_start(&mut self) -> anyhow::Result<()> {
141        let emulated_orders: Vec<OrderAny> = self
142            .cache
143            .borrow()
144            .orders_emulated(None, None, None, None)
145            .into_iter()
146            .cloned()
147            .collect();
148
149        if emulated_orders.is_empty() {
150            log::error!("No emulated orders to reactivate");
151            return Ok(());
152        }
153
154        for order in emulated_orders {
155            if !matches!(
156                order.status(),
157                OrderStatus::Initialized | OrderStatus::Emulated
158            ) {
159                continue; // No longer emulated
160            }
161
162            if let Some(parent_order_id) = &order.parent_order_id() {
163                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
164                    order.clone()
165                } else {
166                    log::error!("Cannot handle order: parent {parent_order_id} not found");
167                    continue;
168                };
169
170                let is_position_closed = parent_order
171                    .position_id()
172                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
173                if parent_order.is_closed() && is_position_closed {
174                    self.manager.cancel_order(&order);
175                    continue; // Parent already closed
176                }
177
178                if parent_order.contingency_type() == Some(ContingencyType::Oto)
179                    && (parent_order.is_active_local()
180                        || parent_order.filled_qty() == Quantity::zero(0))
181                {
182                    continue; // Process contingency order later once parent triggered
183                }
184            }
185
186            let position_id = self
187                .cache
188                .borrow()
189                .position_id(&order.client_order_id())
190                .copied();
191            let client_id = self
192                .cache
193                .borrow()
194                .client_id(&order.client_order_id())
195                .copied();
196
197            let command = SubmitOrder::new(
198                order.trader_id(),
199                client_id.unwrap(),
200                order.strategy_id(),
201                order.instrument_id(),
202                order.client_order_id(),
203                order.venue_order_id().unwrap(),
204                order.clone(),
205                order.exec_algorithm_id(),
206                position_id,
207                UUID4::new(),
208                self.clock.borrow().timestamp_ns(),
209            )?;
210
211            self.handle_submit_order(command);
212        }
213
214        Ok(())
215    }
216
217    /// # Panics
218    ///
219    /// Panics if the order cannot be converted to a passive order.
220    pub fn on_event(&mut self, event: OrderEventAny) {
221        log::info!("{RECV}{EVT} {event}");
222
223        self.manager.handle_event(event.clone());
224
225        if let Some(order) = self.cache.borrow().order(&event.client_order_id())
226            && order.is_closed()
227            && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
228            && let Err(e) = matching_core.delete_order(
229                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
230            )
231        {
232            log::error!("Error deleting order: {e}");
233        }
234        // else: Order not in cache yet
235    }
236
237    pub const fn on_stop(&self) {}
238
239    pub fn on_reset(&mut self) {
240        self.manager.reset();
241        self.matching_cores.clear();
242    }
243
244    pub const fn on_dispose(&self) {}
245
246    pub fn execute(&mut self, command: TradingCommand) {
247        log::info!("{RECV}{CMD} {command}");
248
249        match command {
250            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
251            TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
252            TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
253            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
254            TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
255            _ => log::error!("Cannot handle command: unrecognized {command:?}"),
256        }
257    }
258
259    fn create_matching_core(
260        &mut self,
261        instrument_id: InstrumentId,
262        price_increment: Price,
263    ) -> OrderMatchingCore {
264        let matching_core =
265            OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
266        self.matching_cores
267            .insert(instrument_id, matching_core.clone());
268        log::info!("Creating matching core for {instrument_id:?}");
269        matching_core
270    }
271
272    /// # Panics
273    ///
274    /// Panics if the emulation trigger type is `NoTrigger`.
275    pub fn handle_submit_order(&mut self, command: SubmitOrder) {
276        let mut order = command.order.clone();
277        let emulation_trigger = order.emulation_trigger();
278
279        assert_ne!(
280            emulation_trigger,
281            Some(TriggerType::NoTrigger),
282            "command.order.emulation_trigger must not be TriggerType::NoTrigger"
283        );
284        assert!(
285            self.manager
286                .get_submit_order_commands()
287                .contains_key(&order.client_order_id()),
288            "command.order.client_order_id must be in submit_order_commands"
289        );
290
291        if !matches!(
292            emulation_trigger,
293            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
294        ) {
295            log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
296            self.manager.cancel_order(&order);
297            return;
298        }
299
300        self.check_monitoring(command.strategy_id, command.position_id);
301
302        // Get or create matching core
303        let trigger_instrument_id = order
304            .trigger_instrument_id()
305            .unwrap_or_else(|| order.instrument_id());
306
307        let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
308
309        let mut matching_core = if let Some(core) = matching_core {
310            core
311        } else {
312            // Handle synthetic instruments
313            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
314                let synthetic = self
315                    .cache
316                    .borrow()
317                    .synthetic(&trigger_instrument_id)
318                    .cloned();
319                if let Some(synthetic) = synthetic {
320                    (synthetic.id, synthetic.price_increment)
321                } else {
322                    log::error!(
323                        "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
324                    );
325                    self.manager.cancel_order(&order);
326                    return;
327                }
328            } else {
329                let instrument = self
330                    .cache
331                    .borrow()
332                    .instrument(&trigger_instrument_id)
333                    .cloned();
334                if let Some(instrument) = instrument {
335                    (instrument.id(), instrument.price_increment())
336                } else {
337                    log::error!(
338                        "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
339                    );
340                    self.manager.cancel_order(&order);
341                    return;
342                }
343            };
344
345            self.create_matching_core(instrument_id, price_increment)
346        };
347
348        // Update trailing stop
349        if matches!(
350            order.order_type(),
351            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
352        ) {
353            self.update_trailing_stop_order(&mut order);
354            if order.trigger_price().is_none() {
355                log::error!(
356                    "Cannot handle trailing stop order with no trigger_price and no market updates"
357                );
358
359                self.manager.cancel_order(&order);
360                return;
361            }
362        }
363
364        // Cache command
365        self.manager.cache_submit_order_command(command);
366
367        // Check if immediately marketable
368        matching_core.match_order(
369            &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
370            true,
371        );
372
373        // Handle data subscriptions
374        match emulation_trigger.unwrap() {
375            TriggerType::Default | TriggerType::BidAsk => {
376                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
377                    if !trigger_instrument_id.is_synthetic() {
378                        // TODO: Impl Actor Trait
379                        // self.subscribe_order_book_deltas(&trigger_instrument_id);
380                    }
381                    // TODO: Impl Actor Trait
382                    // self.subscribe_quote_ticks(&trigger_instrument_id)?;
383                    self.subscribed_quotes.insert(trigger_instrument_id);
384                }
385            }
386            TriggerType::LastPrice => {
387                if !self.subscribed_trades.contains(&trigger_instrument_id) {
388                    // TODO: Impl Actor Trait
389                    // self.subscribe_trade_ticks(&trigger_instrument_id)?;
390                    self.subscribed_trades.insert(trigger_instrument_id);
391                }
392            }
393            _ => {
394                log::error!("Invalid TriggerType: {emulation_trigger:?}");
395                return;
396            }
397        }
398
399        // Check if order was already released
400        if !self
401            .manager
402            .get_submit_order_commands()
403            .contains_key(&order.client_order_id())
404        {
405            return; // Already released
406        }
407
408        // Hold in matching core
409        if let Err(e) = matching_core
410            .add_order(PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"))
411        {
412            log::error!("Cannot add order: {e:?}");
413            return;
414        }
415
416        // Generate emulated event if needed
417        if order.status() == OrderStatus::Initialized {
418            let event = OrderEmulated::new(
419                order.trader_id(),
420                order.strategy_id(),
421                order.instrument_id(),
422                order.client_order_id(),
423                UUID4::new(),
424                self.clock.borrow().timestamp_ns(),
425                self.clock.borrow().timestamp_ns(),
426            );
427
428            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
429                log::error!("Cannot apply order event: {e:?}");
430                return;
431            }
432
433            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
434                log::error!("Cannot update order: {e:?}");
435                return;
436            }
437
438            self.manager.send_risk_event(OrderEventAny::Emulated(event));
439
440            msgbus::publish(
441                format!("events.order.{}", order.strategy_id()).into(),
442                &OrderEventAny::Emulated(event),
443            );
444        }
445
446        // Since we are cloning the matching core, we need to insert it back into the original hashmap
447        self.matching_cores
448            .insert(trigger_instrument_id, matching_core);
449
450        log::info!("Emulating {order}");
451    }
452
453    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
454        self.check_monitoring(command.strategy_id, command.position_id);
455
456        for order in &command.order_list.orders {
457            if let Some(parent_order_id) = order.parent_order_id() {
458                let cache = self.cache.borrow();
459                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
460                    parent_order
461                } else {
462                    log::error!("Parent order for {} not found", order.client_order_id());
463                    continue;
464                };
465
466                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
467                    continue; // Process contingency order later once parent triggered
468                }
469            }
470
471            if let Err(e) = self.manager.create_new_submit_order(
472                order,
473                command.position_id,
474                Some(command.client_id),
475            ) {
476                log::error!("Error creating new submit order: {e}");
477            }
478        }
479    }
480
481    fn handle_modify_order(&mut self, command: ModifyOrder) {
482        if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
483            let price = match command.price {
484                Some(price) => Some(price),
485                None => order.price(),
486            };
487
488            let trigger_price = match command.trigger_price {
489                Some(trigger_price) => Some(trigger_price),
490                None => order.trigger_price(),
491            };
492
493            // Generate event
494            let ts_now = self.clock.borrow().timestamp_ns();
495            let event = OrderUpdated::new(
496                order.trader_id(),
497                order.strategy_id(),
498                order.instrument_id(),
499                order.client_order_id(),
500                command.quantity.unwrap_or(order.quantity()),
501                UUID4::new(),
502                ts_now,
503                ts_now,
504                false,
505                order.venue_order_id(),
506                order.account_id(),
507                price,
508                trigger_price,
509            );
510
511            self.manager.send_exec_event(OrderEventAny::Updated(event));
512
513            let trigger_instrument_id = order
514                .trigger_instrument_id()
515                .unwrap_or_else(|| order.instrument_id());
516
517            if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
518                matching_core.match_order(
519                    &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
520                    false,
521                );
522            } else {
523                log::error!(
524                    "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
525                );
526            }
527        } else {
528            log::error!("Cannot modify order: {} not found", command.client_order_id);
529        }
530    }
531
532    pub fn handle_cancel_order(&mut self, command: CancelOrder) {
533        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
534            order.clone()
535        } else {
536            log::error!("Cannot cancel order: {} not found", command.client_order_id);
537            return;
538        };
539
540        let trigger_instrument_id = order
541            .trigger_instrument_id()
542            .unwrap_or_else(|| order.instrument_id());
543
544        let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
545            core
546        } else {
547            self.manager.cancel_order(&order);
548            return;
549        };
550
551        if !matching_core.order_exists(order.client_order_id())
552            && order.is_open()
553            && !order.is_pending_cancel()
554        {
555            // Order not held in the emulator
556            self.manager
557                .send_exec_command(TradingCommand::CancelOrder(command));
558        } else {
559            self.manager.cancel_order(&order);
560        }
561    }
562
563    fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
564        let instrument_id = command.instrument_id;
565        let matching_core = match self.matching_cores.get(&instrument_id) {
566            Some(core) => core,
567            None => return, // No orders to cancel
568        };
569
570        let orders_to_cancel = match command.order_side {
571            OrderSide::NoOrderSide => {
572                // Get both bid and ask orders
573                let mut all_orders = Vec::new();
574                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
575                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
576                all_orders
577            }
578            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
579            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
580        };
581
582        // Process all orders in a single iteration
583        for order in orders_to_cancel {
584            self.manager.cancel_order(&OrderAny::from(order));
585        }
586    }
587
588    // --------------------------------------------------------------------------------------------
589
590    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
591        log::info!(
592            "Updating order {} quantity to {new_quantity}",
593            order.client_order_id(),
594        );
595
596        // Generate event
597        let ts_now = self.clock.borrow().timestamp_ns();
598        let event = OrderUpdated::new(
599            order.trader_id(),
600            order.strategy_id(),
601            order.instrument_id(),
602            order.client_order_id(),
603            new_quantity,
604            UUID4::new(),
605            ts_now,
606            ts_now,
607            false,
608            None,
609            order.account_id(),
610            None,
611            None,
612        );
613
614        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
615            log::error!("Cannot apply order event: {e:?}");
616            return;
617        }
618        if let Err(e) = self.cache.borrow_mut().update_order(order) {
619            log::error!("Cannot update order: {e:?}");
620            return;
621        }
622
623        self.manager.send_risk_event(OrderEventAny::Updated(event));
624    }
625
626    // -----------------------------------------------------------------------------------------------
627
628    pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
629        log::debug!("Processing {deltas:?}");
630
631        let instrument_id = &deltas.instrument_id;
632        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
633            if let Some(book) = self.cache.borrow().order_book(instrument_id) {
634                let best_bid = book.best_bid_price();
635                let best_ask = book.best_ask_price();
636
637                if let Some(best_bid) = best_bid {
638                    matching_core.set_bid_raw(best_bid);
639                }
640
641                if let Some(best_ask) = best_ask {
642                    matching_core.set_ask_raw(best_ask);
643                }
644            } else {
645                log::error!(
646                    "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
647                    deltas.instrument_id
648                );
649            }
650
651            self.iterate_orders(instrument_id);
652        } else {
653            log::error!(
654                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
655                deltas.instrument_id
656            );
657        }
658    }
659
660    pub fn on_quote_tick(&mut self, quote: QuoteTick) {
661        log::debug!("Processing {quote}:?");
662
663        let instrument_id = &quote.instrument_id;
664        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
665            matching_core.set_bid_raw(quote.bid_price);
666            matching_core.set_ask_raw(quote.ask_price);
667
668            self.iterate_orders(instrument_id);
669        } else {
670            log::error!(
671                "Cannot handle `QuoteTick`: no matching core for instrument {}",
672                quote.instrument_id
673            );
674        }
675    }
676
677    pub fn on_trade_tick(&mut self, trade: TradeTick) {
678        log::debug!("Processing {trade:?}");
679
680        let instrument_id = &trade.instrument_id;
681        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
682            matching_core.set_last_raw(trade.price);
683            if !self.subscribed_quotes.contains(instrument_id) {
684                matching_core.set_bid_raw(trade.price);
685                matching_core.set_ask_raw(trade.price);
686            }
687
688            self.iterate_orders(instrument_id);
689        } else {
690            log::error!(
691                "Cannot handle `TradeTick`: no matching core for instrument {}",
692                trade.instrument_id
693            );
694        }
695    }
696
697    fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
698        let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
699            matching_core.iterate();
700
701            matching_core.get_orders()
702        } else {
703            log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
704            return;
705        };
706
707        for order in orders {
708            if order.is_closed() {
709                continue;
710            }
711
712            let mut order: OrderAny = order.clone().into();
713            if matches!(
714                order.order_type(),
715                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
716            ) {
717                self.update_trailing_stop_order(&mut order);
718            }
719        }
720    }
721
722    /// # Panics
723    ///
724    /// Panics if the order cannot be converted to a passive order.
725    pub fn cancel_order(&mut self, order: &OrderAny) {
726        log::info!("Canceling order {}", order.client_order_id());
727
728        let mut order = order.clone();
729        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
730
731        let trigger_instrument_id = order
732            .trigger_instrument_id()
733            .unwrap_or(order.instrument_id());
734
735        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
736            && let Err(e) = matching_core.delete_order(
737                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
738            )
739        {
740            log::error!("Cannot delete order: {e:?}");
741        }
742
743        self.cache
744            .borrow_mut()
745            .update_order_pending_cancel_local(&order);
746
747        // Generate event
748        let ts_now = self.clock.borrow().timestamp_ns();
749        let event = OrderCanceled::new(
750            order.trader_id(),
751            order.strategy_id(),
752            order.instrument_id(),
753            order.client_order_id(),
754            UUID4::new(),
755            ts_now,
756            ts_now,
757            false,
758            order.venue_order_id(),
759            order.account_id(),
760        );
761
762        self.manager.send_exec_event(OrderEventAny::Canceled(event));
763    }
764
765    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
766        if !self.subscribed_strategies.contains(&strategy_id) {
767            // Subscribe to all strategy events
768            if let Some(handler) = &self.on_event_handler {
769                msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
770                msgbus::subscribe_str(
771                    format!("events.position.{strategy_id}"),
772                    handler.clone(),
773                    None,
774                );
775                self.subscribed_strategies.insert(strategy_id);
776                log::info!("Subscribed to strategy {strategy_id} order and position events");
777            }
778        }
779
780        if let Some(position_id) = position_id
781            && !self.monitored_positions.contains(&position_id)
782        {
783            self.monitored_positions.insert(position_id);
784        }
785    }
786
787    /// Validates market data availability for order release.
788    ///
789    /// Returns `Some(released_price)` if market data is available, `None` otherwise.
790    /// Logs appropriate warnings when market data is not yet available.
791    ///
792    /// Does NOT pop the submit order command - caller must do that and handle missing command
793    /// according to their contract (panic for market orders, return for limit orders).
794    fn validate_release(
795        &self,
796        order: &OrderAny,
797        matching_core: &OrderMatchingCore,
798        trigger_instrument_id: InstrumentId,
799    ) -> Option<Price> {
800        let released_price = match order.order_side_specified() {
801            OrderSideSpecified::Buy => matching_core.ask,
802            OrderSideSpecified::Sell => matching_core.bid,
803        };
804
805        if released_price.is_none() {
806            log::warn!(
807                "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
808                order.client_order_id(),
809            );
810            return None;
811        }
812
813        Some(released_price.unwrap())
814    }
815
816    /// # Panics
817    ///
818    /// Panics if the order type is invalid for a stop order.
819    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
820        match order.order_type() {
821            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
822                self.fill_limit_order(order);
823            }
824            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
825                self.fill_market_order(order);
826            }
827            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
828        }
829    }
830
831    /// # Panics
832    ///
833    /// Panics if a limit order has no price.
834    pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
835        if matches!(order.order_type(), OrderType::Limit) {
836            self.fill_market_order(order);
837            return;
838        }
839
840        let trigger_instrument_id = order
841            .trigger_instrument_id()
842            .unwrap_or(order.instrument_id());
843
844        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
845            Some(core) => core,
846            None => {
847                log::error!(
848                    "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
849                );
850                return; // Order stays queued for retry
851            }
852        };
853
854        let released_price =
855            match self.validate_release(order, matching_core, trigger_instrument_id) {
856                Some(price) => price,
857                None => return, // Order stays queued for retry
858            };
859
860        let mut command = match self
861            .manager
862            .pop_submit_order_command(order.client_order_id())
863        {
864            Some(command) => command,
865            None => return, // Order already released
866        };
867
868        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
869            if let Err(e) = matching_core.delete_order(
870                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
871            ) {
872                log::error!("Error deleting order: {e:?}");
873            }
874
875            let emulation_trigger = TriggerType::NoTrigger;
876
877            // Transform order
878            let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
879                order.trader_id(),
880                order.strategy_id(),
881                order.instrument_id(),
882                order.client_order_id(),
883                order.order_side(),
884                order.quantity(),
885                order.price().unwrap(),
886                order.time_in_force(),
887                order.expire_time(),
888                order.is_post_only(),
889                order.is_reduce_only(),
890                order.is_quote_quantity(),
891                order.display_qty(),
892                Some(emulation_trigger),
893                Some(trigger_instrument_id),
894                order.contingency_type(),
895                order.order_list_id(),
896                order.linked_order_ids().map(Vec::from),
897                order.parent_order_id(),
898                order.exec_algorithm_id(),
899                order.exec_algorithm_params().cloned(),
900                order.exec_spawn_id(),
901                order.tags().map(Vec::from),
902                UUID4::new(),
903                self.clock.borrow().timestamp_ns(),
904            ) {
905                transformed
906            } else {
907                log::error!("Cannot create limit order");
908                return;
909            };
910            transformed.liquidity_side = order.liquidity_side();
911
912            // TODO: fix
913            // let triggered_price = order.trigger_price();
914            // if triggered_price.is_some() {
915            //     transformed.trigger_price() = (triggered_price.unwrap());
916            // }
917
918            let original_events = order.events();
919
920            for event in original_events {
921                transformed.events.insert(0, event.clone());
922            }
923
924            if let Err(e) = self.cache.borrow_mut().add_order(
925                OrderAny::Limit(transformed.clone()),
926                command.position_id,
927                Some(command.client_id),
928                true,
929            ) {
930                log::error!("Failed to add order: {e}");
931            }
932
933            // Replace commands order with transformed order
934            command.order = OrderAny::Limit(transformed.clone());
935
936            msgbus::publish(
937                format!("events.order.{}", order.strategy_id()).into(),
938                transformed.last_event(),
939            );
940
941            // Generate event
942            let event = OrderReleased::new(
943                order.trader_id(),
944                order.strategy_id(),
945                order.instrument_id(),
946                order.client_order_id(),
947                released_price,
948                UUID4::new(),
949                self.clock.borrow().timestamp_ns(),
950                self.clock.borrow().timestamp_ns(),
951            );
952
953            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
954                log::error!("Failed to apply order event: {e}");
955            }
956
957            if let Err(e) = self
958                .cache
959                .borrow_mut()
960                .update_order(&OrderAny::Limit(transformed.clone()))
961            {
962                log::error!("Failed to update order: {e}");
963            }
964
965            self.manager.send_risk_event(OrderEventAny::Released(event));
966
967            log::info!("Releasing order {}", order.client_order_id());
968
969            // Publish event
970            msgbus::publish(
971                format!("events.order.{}", transformed.strategy_id()).into(),
972                &OrderEventAny::Released(event),
973            );
974
975            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
976                self.manager.send_algo_command(command, exec_algorithm_id);
977            } else {
978                self.manager
979                    .send_exec_command(TradingCommand::SubmitOrder(command));
980            }
981        }
982    }
983
984    /// # Panics
985    ///
986    /// Panics if a market order command is missing.
987    pub fn fill_market_order(&mut self, order: &mut OrderAny) {
988        let trigger_instrument_id = order
989            .trigger_instrument_id()
990            .unwrap_or(order.instrument_id());
991
992        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
993            Some(core) => core,
994            None => {
995                log::error!(
996                    "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
997                );
998                return; // Order stays queued for retry
999            }
1000        };
1001
1002        let released_price =
1003            match self.validate_release(order, matching_core, trigger_instrument_id) {
1004                Some(price) => price,
1005                None => return, // Order stays queued for retry
1006            };
1007
1008        let mut command = self
1009            .manager
1010            .pop_submit_order_command(order.client_order_id())
1011            .expect("invalid operation `fill_market_order` with no command");
1012
1013        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1014            if let Err(e) = matching_core.delete_order(
1015                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
1016            ) {
1017                log::error!("Cannot delete order: {e:?}");
1018            }
1019
1020            order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1021
1022            // Transform order
1023            let mut transformed = MarketOrder::new(
1024                order.trader_id(),
1025                order.strategy_id(),
1026                order.instrument_id(),
1027                order.client_order_id(),
1028                order.order_side(),
1029                order.quantity(),
1030                order.time_in_force(),
1031                UUID4::new(),
1032                self.clock.borrow().timestamp_ns(),
1033                order.is_reduce_only(),
1034                order.is_quote_quantity(),
1035                order.contingency_type(),
1036                order.order_list_id(),
1037                order.linked_order_ids().map(Vec::from),
1038                order.parent_order_id(),
1039                order.exec_algorithm_id(),
1040                order.exec_algorithm_params().cloned(),
1041                order.exec_spawn_id(),
1042                order.tags().map(Vec::from),
1043            );
1044
1045            let original_events = order.events();
1046
1047            for event in original_events {
1048                transformed.events.insert(0, event.clone());
1049            }
1050
1051            if let Err(e) = self.cache.borrow_mut().add_order(
1052                OrderAny::Market(transformed.clone()),
1053                command.position_id,
1054                Some(command.client_id),
1055                true,
1056            ) {
1057                log::error!("Failed to add order: {e}");
1058            }
1059
1060            // Replace commands order with transformed order
1061            command.order = OrderAny::Market(transformed.clone());
1062
1063            msgbus::publish(
1064                format!("events.order.{}", order.strategy_id()).into(),
1065                transformed.last_event(),
1066            );
1067
1068            // Generate event
1069            let ts_now = self.clock.borrow().timestamp_ns();
1070            let event = OrderReleased::new(
1071                order.trader_id(),
1072                order.strategy_id(),
1073                order.instrument_id(),
1074                order.client_order_id(),
1075                released_price,
1076                UUID4::new(),
1077                ts_now,
1078                ts_now,
1079            );
1080
1081            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1082                log::error!("Failed to apply order event: {e}");
1083            }
1084
1085            if let Err(e) = self
1086                .cache
1087                .borrow_mut()
1088                .update_order(&OrderAny::Market(transformed))
1089            {
1090                log::error!("Failed to update order: {e}");
1091            }
1092            self.manager.send_risk_event(OrderEventAny::Released(event));
1093
1094            log::info!("Releasing order {}", order.client_order_id());
1095
1096            // Publish event
1097            msgbus::publish(
1098                format!("events.order.{}", order.strategy_id()).into(),
1099                &OrderEventAny::Released(event),
1100            );
1101
1102            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1103                self.manager.send_algo_command(command, exec_algorithm_id);
1104            } else {
1105                self.manager
1106                    .send_exec_command(TradingCommand::SubmitOrder(command));
1107            }
1108        }
1109    }
1110
1111    #[allow(clippy::too_many_lines)]
1112    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1113        let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1114            log::error!(
1115                "Cannot update trailing-stop order: no matching core for instrument {}",
1116                order.instrument_id()
1117            );
1118            return;
1119        };
1120
1121        let mut bid = matching_core.bid;
1122        let mut ask = matching_core.ask;
1123        let mut last = matching_core.last;
1124
1125        if bid.is_none() || ask.is_none() || last.is_none() {
1126            if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1127                bid.get_or_insert(q.bid_price);
1128                ask.get_or_insert(q.ask_price);
1129            }
1130            if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1131                last.get_or_insert(t.price);
1132            }
1133        }
1134
1135        let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1136            matching_core.price_increment,
1137            order.trigger_price(),
1138            order.activation_price(),
1139            order,
1140            bid,
1141            ask,
1142            last,
1143        ) {
1144            Ok(pair) => pair,
1145            Err(e) => {
1146                log::warn!("Cannot calculate trailing-stop update: {e}");
1147                return;
1148            }
1149        };
1150
1151        if new_trigger_px.is_none() && new_limit_px.is_none() {
1152            return;
1153        }
1154
1155        let ts_now = self.clock.borrow().timestamp_ns();
1156        let update = OrderUpdated::new(
1157            order.trader_id(),
1158            order.strategy_id(),
1159            order.instrument_id(),
1160            order.client_order_id(),
1161            order.quantity(),
1162            UUID4::new(),
1163            ts_now,
1164            ts_now,
1165            false,
1166            order.venue_order_id(),
1167            order.account_id(),
1168            new_limit_px,
1169            new_trigger_px,
1170        );
1171        let wrapped = OrderEventAny::Updated(update);
1172        if let Err(e) = order.apply(wrapped.clone()) {
1173            log::error!("Failed to apply order event: {e}");
1174            return;
1175        }
1176        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1177            log::error!("Failed to update order in cache: {e}");
1178            return;
1179        }
1180        self.manager.send_risk_event(wrapped);
1181    }
1182}