nautilus_execution/order_emulator/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    collections::{HashMap, HashSet},
19    fmt::Debug,
20    rc::Rc,
21};
22
23use nautilus_common::{
24    cache::Cache,
25    clock::Clock,
26    logging::{CMD, EVT, RECV},
27    messages::execution::{
28        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
29    },
30    msgbus::{self, handler::ShareableMessageHandler},
31};
32use nautilus_core::UUID4;
33use nautilus_model::{
34    data::{OrderBookDeltas, QuoteTick, TradeTick},
35    enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
36    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
37    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
38    instruments::Instrument,
39    orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
40    types::{Price, Quantity},
41};
42
43use crate::{
44    matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
45    trailing::trailing_stop_calculate,
46};
47
48pub struct OrderEmulator {
49    clock: Rc<RefCell<dyn Clock>>,
50    cache: Rc<RefCell<Cache>>,
51    manager: OrderManager,
52    matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
53    subscribed_quotes: HashSet<InstrumentId>,
54    subscribed_trades: HashSet<InstrumentId>,
55    subscribed_strategies: HashSet<StrategyId>,
56    monitored_positions: HashSet<PositionId>,
57    on_event_handler: Option<ShareableMessageHandler>,
58}
59
60impl Debug for OrderEmulator {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct(stringify!(OrderEmulator))
63            .field("cores", &self.matching_cores.len())
64            .field("subscribed_quotes", &self.subscribed_quotes.len())
65            .finish()
66    }
67}
68
69impl OrderEmulator {
70    pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
71        // TODO: Impl Actor Trait
72        // self.register_base(portfolio, msgbus, cache, clock);
73
74        let active_local = true;
75        let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
76
77        Self {
78            clock,
79            cache,
80            manager,
81            matching_cores: HashMap::new(),
82            subscribed_quotes: HashSet::new(),
83            subscribed_trades: HashSet::new(),
84            subscribed_strategies: HashSet::new(),
85            monitored_positions: HashSet::new(),
86            on_event_handler: None,
87        }
88    }
89
90    pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
91        self.on_event_handler = Some(handler);
92    }
93
94    // TODO: WIP
95    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
96    //     self.manager.set_submit_order_handler(handler);
97    // }
98    //
99    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
100    //     self.manager.set_cancel_order_handler(handler);
101    // }
102    //
103    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
104    //     self.manager.set_modify_order_handler(handler);
105    // }
106
107    #[must_use]
108    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
109        let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
110        quotes.sort();
111        quotes
112    }
113
114    #[must_use]
115    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
116        let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
117        trades.sort();
118        trades
119    }
120
121    #[must_use]
122    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
123        self.manager.get_submit_order_commands()
124    }
125
126    #[must_use]
127    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
128        self.matching_cores.get(instrument_id).cloned()
129    }
130
131    /// Reactivates emulated orders from cache on start.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if no emulated orders are found or processing fails.
136    ///
137    /// # Panics
138    ///
139    /// Panics if a cached client ID cannot be unwrapped.
140    pub fn on_start(&mut self) -> anyhow::Result<()> {
141        let emulated_orders: Vec<OrderAny> = self
142            .cache
143            .borrow()
144            .orders_emulated(None, None, None, None)
145            .into_iter()
146            .cloned()
147            .collect();
148
149        if emulated_orders.is_empty() {
150            log::error!("No emulated orders to reactivate");
151            return Ok(());
152        }
153
154        for order in emulated_orders {
155            if !matches!(
156                order.status(),
157                OrderStatus::Initialized | OrderStatus::Emulated
158            ) {
159                continue; // No longer emulated
160            }
161
162            if let Some(parent_order_id) = &order.parent_order_id() {
163                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
164                    order.clone()
165                } else {
166                    log::error!("Cannot handle order: parent {parent_order_id} not found");
167                    continue;
168                };
169
170                let is_position_closed = parent_order
171                    .position_id()
172                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
173                if parent_order.is_closed() && is_position_closed {
174                    self.manager.cancel_order(&order);
175                    continue; // Parent already closed
176                }
177
178                if parent_order.contingency_type() == Some(ContingencyType::Oto)
179                    && (parent_order.is_active_local()
180                        || parent_order.filled_qty() == Quantity::zero(0))
181                {
182                    continue; // Process contingency order later once parent triggered
183                }
184            }
185
186            let position_id = self
187                .cache
188                .borrow()
189                .position_id(&order.client_order_id())
190                .copied();
191            let client_id = self
192                .cache
193                .borrow()
194                .client_id(&order.client_order_id())
195                .copied();
196
197            let command = SubmitOrder::new(
198                order.trader_id(),
199                client_id.unwrap(),
200                order.strategy_id(),
201                order.instrument_id(),
202                order.client_order_id(),
203                order.venue_order_id().unwrap(),
204                order.clone(),
205                order.exec_algorithm_id(),
206                position_id,
207                UUID4::new(),
208                self.clock.borrow().timestamp_ns(),
209            )?;
210
211            self.handle_submit_order(command);
212        }
213
214        Ok(())
215    }
216
217    pub fn on_event(&mut self, event: OrderEventAny) {
218        log::info!("{RECV}{EVT} {event}");
219
220        self.manager.handle_event(event.clone());
221
222        if let Some(order) = self.cache.borrow().order(&event.client_order_id())
223            && order.is_closed()
224            && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
225            && let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
226        {
227            log::error!("Error deleting order: {e}");
228        }
229        // else: Order not in cache yet
230    }
231
232    pub const fn on_stop(&self) {}
233
234    pub fn on_reset(&mut self) {
235        self.manager.reset();
236        self.matching_cores.clear();
237    }
238
239    pub const fn on_dispose(&self) {}
240
241    pub fn execute(&mut self, command: TradingCommand) {
242        log::info!("{RECV}{CMD} {command}");
243
244        match command {
245            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
246            TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
247            TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
248            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
249            TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
250            _ => log::error!("Cannot handle command: unrecognized {command:?}"),
251        }
252    }
253
254    fn create_matching_core(
255        &mut self,
256        instrument_id: InstrumentId,
257        price_increment: Price,
258    ) -> OrderMatchingCore {
259        let matching_core =
260            OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
261        self.matching_cores
262            .insert(instrument_id, matching_core.clone());
263        log::info!("Creating matching core for {instrument_id:?}");
264        matching_core
265    }
266
267    /// # Panics
268    ///
269    /// Panics if the emulation trigger type is `NoTrigger`.
270    pub fn handle_submit_order(&mut self, command: SubmitOrder) {
271        let mut order = command.order.clone();
272        let emulation_trigger = order.emulation_trigger();
273
274        assert_ne!(
275            emulation_trigger,
276            Some(TriggerType::NoTrigger),
277            "command.order.emulation_trigger must not be TriggerType::NoTrigger"
278        );
279        assert!(
280            self.manager
281                .get_submit_order_commands()
282                .contains_key(&order.client_order_id()),
283            "command.order.client_order_id must be in submit_order_commands"
284        );
285
286        if !matches!(
287            emulation_trigger,
288            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
289        ) {
290            log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
291            self.manager.cancel_order(&order);
292            return;
293        }
294
295        self.check_monitoring(command.strategy_id, command.position_id);
296
297        // Get or create matching core
298        let trigger_instrument_id = order
299            .trigger_instrument_id()
300            .unwrap_or_else(|| order.instrument_id());
301
302        let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
303
304        let mut matching_core = if let Some(core) = matching_core {
305            core
306        } else {
307            // Handle synthetic instruments
308            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
309                let synthetic = self
310                    .cache
311                    .borrow()
312                    .synthetic(&trigger_instrument_id)
313                    .cloned();
314                if let Some(synthetic) = synthetic {
315                    (synthetic.id, synthetic.price_increment)
316                } else {
317                    log::error!(
318                        "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
319                    );
320                    self.manager.cancel_order(&order);
321                    return;
322                }
323            } else {
324                let instrument = self
325                    .cache
326                    .borrow()
327                    .instrument(&trigger_instrument_id)
328                    .cloned();
329                if let Some(instrument) = instrument {
330                    (instrument.id(), instrument.price_increment())
331                } else {
332                    log::error!(
333                        "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
334                    );
335                    self.manager.cancel_order(&order);
336                    return;
337                }
338            };
339
340            self.create_matching_core(instrument_id, price_increment)
341        };
342
343        // Update trailing stop
344        if matches!(
345            order.order_type(),
346            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
347        ) {
348            self.update_trailing_stop_order(&mut order);
349            if order.trigger_price().is_none() {
350                log::error!(
351                    "Cannot handle trailing stop order with no trigger_price and no market updates"
352                );
353
354                self.manager.cancel_order(&order);
355                return;
356            }
357        }
358
359        // Cache command
360        self.manager.cache_submit_order_command(command);
361
362        // Check if immediately marketable
363        matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
364
365        // Handle data subscriptions
366        match emulation_trigger.unwrap() {
367            TriggerType::Default | TriggerType::BidAsk => {
368                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
369                    if !trigger_instrument_id.is_synthetic() {
370                        // TODO: Impl Actor Trait
371                        // self.subscribe_order_book_deltas(&trigger_instrument_id);
372                    }
373                    // TODO: Impl Actor Trait
374                    // self.subscribe_quote_ticks(&trigger_instrument_id)?;
375                    self.subscribed_quotes.insert(trigger_instrument_id);
376                }
377            }
378            TriggerType::LastPrice => {
379                if !self.subscribed_trades.contains(&trigger_instrument_id) {
380                    // TODO: Impl Actor Trait
381                    // self.subscribe_trade_ticks(&trigger_instrument_id)?;
382                    self.subscribed_trades.insert(trigger_instrument_id);
383                }
384            }
385            _ => {
386                log::error!("Invalid TriggerType: {emulation_trigger:?}");
387                return;
388            }
389        }
390
391        // Check if order was already released
392        if !self
393            .manager
394            .get_submit_order_commands()
395            .contains_key(&order.client_order_id())
396        {
397            return; // Already released
398        }
399
400        // Hold in matching core
401        if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
402            log::error!("Cannot add order: {e:?}");
403            return;
404        }
405
406        // Generate emulated event if needed
407        if order.status() == OrderStatus::Initialized {
408            let event = OrderEmulated::new(
409                order.trader_id(),
410                order.strategy_id(),
411                order.instrument_id(),
412                order.client_order_id(),
413                UUID4::new(),
414                self.clock.borrow().timestamp_ns(),
415                self.clock.borrow().timestamp_ns(),
416            );
417
418            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
419                log::error!("Cannot apply order event: {e:?}");
420                return;
421            }
422
423            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
424                log::error!("Cannot update order: {e:?}");
425                return;
426            }
427
428            self.manager.send_risk_event(OrderEventAny::Emulated(event));
429
430            msgbus::publish(
431                format!("events.order.{}", order.strategy_id()).into(),
432                &OrderEventAny::Emulated(event),
433            );
434        }
435
436        // Since we are cloning the matching core, we need to insert it back into the original hashmap
437        self.matching_cores
438            .insert(trigger_instrument_id, matching_core);
439
440        log::info!("Emulating {order}");
441    }
442
443    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
444        self.check_monitoring(command.strategy_id, command.position_id);
445
446        for order in &command.order_list.orders {
447            if let Some(parent_order_id) = order.parent_order_id() {
448                let cache = self.cache.borrow();
449                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
450                    parent_order
451                } else {
452                    log::error!("Parent order for {} not found", order.client_order_id());
453                    continue;
454                };
455
456                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
457                    continue; // Process contingency order later once parent triggered
458                }
459            }
460
461            if let Err(e) = self.manager.create_new_submit_order(
462                order,
463                command.position_id,
464                Some(command.client_id),
465            ) {
466                log::error!("Error creating new submit order: {e}");
467            }
468        }
469    }
470
471    fn handle_modify_order(&mut self, command: ModifyOrder) {
472        if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
473            let price = match command.price {
474                Some(price) => Some(price),
475                None => order.price(),
476            };
477
478            let trigger_price = match command.trigger_price {
479                Some(trigger_price) => Some(trigger_price),
480                None => order.trigger_price(),
481            };
482
483            // Generate event
484            let ts_now = self.clock.borrow().timestamp_ns();
485            let event = OrderUpdated::new(
486                order.trader_id(),
487                order.strategy_id(),
488                order.instrument_id(),
489                order.client_order_id(),
490                command.quantity.unwrap_or(order.quantity()),
491                UUID4::new(),
492                ts_now,
493                ts_now,
494                false,
495                order.venue_order_id(),
496                order.account_id(),
497                price,
498                trigger_price,
499            );
500
501            self.manager.send_exec_event(OrderEventAny::Updated(event));
502
503            let trigger_instrument_id = order
504                .trigger_instrument_id()
505                .unwrap_or_else(|| order.instrument_id());
506
507            if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
508                matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
509            } else {
510                log::error!(
511                    "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
512                );
513            }
514        } else {
515            log::error!("Cannot modify order: {} not found", command.client_order_id);
516        }
517    }
518
519    pub fn handle_cancel_order(&mut self, command: CancelOrder) {
520        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
521            order.clone()
522        } else {
523            log::error!("Cannot cancel order: {} not found", command.client_order_id);
524            return;
525        };
526
527        let trigger_instrument_id = order
528            .trigger_instrument_id()
529            .unwrap_or_else(|| order.instrument_id());
530
531        let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
532            core
533        } else {
534            self.manager.cancel_order(&order);
535            return;
536        };
537
538        if !matching_core.order_exists(order.client_order_id())
539            && order.is_open()
540            && !order.is_pending_cancel()
541        {
542            // Order not held in the emulator
543            self.manager
544                .send_exec_command(TradingCommand::CancelOrder(command));
545        } else {
546            self.manager.cancel_order(&order);
547        }
548    }
549
550    fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
551        let instrument_id = command.instrument_id;
552        let matching_core = match self.matching_cores.get(&instrument_id) {
553            Some(core) => core,
554            None => return, // No orders to cancel
555        };
556
557        let orders_to_cancel = match command.order_side {
558            OrderSide::NoOrderSide => {
559                // Get both bid and ask orders
560                let mut all_orders = Vec::new();
561                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
562                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
563                all_orders
564            }
565            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
566            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
567        };
568
569        // Process all orders in a single iteration
570        for order in orders_to_cancel {
571            self.manager.cancel_order(&OrderAny::from(order));
572        }
573    }
574
575    // --------------------------------------------------------------------------------------------
576
577    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
578        log::info!(
579            "Updating order {} quantity to {new_quantity}",
580            order.client_order_id(),
581        );
582
583        // Generate event
584        let ts_now = self.clock.borrow().timestamp_ns();
585        let event = OrderUpdated::new(
586            order.trader_id(),
587            order.strategy_id(),
588            order.instrument_id(),
589            order.client_order_id(),
590            new_quantity,
591            UUID4::new(),
592            ts_now,
593            ts_now,
594            false,
595            None,
596            order.account_id(),
597            None,
598            None,
599        );
600
601        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
602            log::error!("Cannot apply order event: {e:?}");
603            return;
604        }
605        if let Err(e) = self.cache.borrow_mut().update_order(order) {
606            log::error!("Cannot update order: {e:?}");
607            return;
608        }
609
610        self.manager.send_risk_event(OrderEventAny::Updated(event));
611    }
612
613    // -----------------------------------------------------------------------------------------------
614
615    pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
616        log::debug!("Processing {deltas:?}");
617
618        let instrument_id = &deltas.instrument_id;
619        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
620            if let Some(book) = self.cache.borrow().order_book(instrument_id) {
621                let best_bid = book.best_bid_price();
622                let best_ask = book.best_ask_price();
623
624                if let Some(best_bid) = best_bid {
625                    matching_core.set_bid_raw(best_bid);
626                }
627
628                if let Some(best_ask) = best_ask {
629                    matching_core.set_ask_raw(best_ask);
630                }
631            } else {
632                log::error!(
633                    "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
634                    deltas.instrument_id
635                );
636            }
637
638            self.iterate_orders(instrument_id);
639        } else {
640            log::error!(
641                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
642                deltas.instrument_id
643            );
644        }
645    }
646
647    pub fn on_quote_tick(&mut self, quote: QuoteTick) {
648        log::debug!("Processing {quote}:?");
649
650        let instrument_id = &quote.instrument_id;
651        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
652            matching_core.set_bid_raw(quote.bid_price);
653            matching_core.set_ask_raw(quote.ask_price);
654
655            self.iterate_orders(instrument_id);
656        } else {
657            log::error!(
658                "Cannot handle `QuoteTick`: no matching core for instrument {}",
659                quote.instrument_id
660            );
661        }
662    }
663
664    pub fn on_trade_tick(&mut self, trade: TradeTick) {
665        log::debug!("Processing {trade:?}");
666
667        let instrument_id = &trade.instrument_id;
668        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
669            matching_core.set_last_raw(trade.price);
670            if !self.subscribed_quotes.contains(instrument_id) {
671                matching_core.set_bid_raw(trade.price);
672                matching_core.set_ask_raw(trade.price);
673            }
674
675            self.iterate_orders(instrument_id);
676        } else {
677            log::error!(
678                "Cannot handle `TradeTick`: no matching core for instrument {}",
679                trade.instrument_id
680            );
681        }
682    }
683
684    fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
685        let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
686            matching_core.iterate();
687
688            matching_core.get_orders()
689        } else {
690            log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
691            return;
692        };
693
694        for order in orders {
695            if order.is_closed() {
696                continue;
697            }
698
699            let mut order: OrderAny = order.clone().into();
700            if matches!(
701                order.order_type(),
702                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
703            ) {
704                self.update_trailing_stop_order(&mut order);
705            }
706        }
707    }
708
709    pub fn cancel_order(&mut self, order: &OrderAny) {
710        log::info!("Canceling order {}", order.client_order_id());
711
712        let mut order = order.clone();
713        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
714
715        let trigger_instrument_id = order
716            .trigger_instrument_id()
717            .unwrap_or(order.instrument_id());
718
719        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
720            && let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
721        {
722            log::error!("Cannot delete order: {e:?}");
723        }
724
725        self.cache
726            .borrow_mut()
727            .update_order_pending_cancel_local(&order);
728
729        // Generate event
730        let ts_now = self.clock.borrow().timestamp_ns();
731        let event = OrderCanceled::new(
732            order.trader_id(),
733            order.strategy_id(),
734            order.instrument_id(),
735            order.client_order_id(),
736            UUID4::new(),
737            ts_now,
738            ts_now,
739            false,
740            order.venue_order_id(),
741            order.account_id(),
742        );
743
744        self.manager.send_exec_event(OrderEventAny::Canceled(event));
745    }
746
747    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
748        if !self.subscribed_strategies.contains(&strategy_id) {
749            // Subscribe to all strategy events
750            if let Some(handler) = &self.on_event_handler {
751                msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
752                msgbus::subscribe_str(
753                    format!("events.position.{strategy_id}"),
754                    handler.clone(),
755                    None,
756                );
757                self.subscribed_strategies.insert(strategy_id);
758                log::info!("Subscribed to strategy {strategy_id} order and position events");
759            }
760        }
761
762        if let Some(position_id) = position_id
763            && !self.monitored_positions.contains(&position_id)
764        {
765            self.monitored_positions.insert(position_id);
766        }
767    }
768
769    /// # Panics
770    ///
771    /// Panics if the order type is invalid for a stop order.
772    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
773        match order.order_type() {
774            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
775                self.fill_limit_order(order);
776            }
777            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
778                self.fill_market_order(order);
779            }
780            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
781        }
782    }
783
784    /// # Panics
785    ///
786    /// Panics if a limit order has no price.
787    pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
788        if matches!(order.order_type(), OrderType::Limit) {
789            self.fill_market_order(order);
790            return;
791        }
792
793        // Fetch command
794        let mut command = match self
795            .manager
796            .pop_submit_order_command(order.client_order_id())
797        {
798            Some(command) => command,
799            None => return, // Order already released
800        };
801
802        let trigger_instrument_id = order
803            .trigger_instrument_id()
804            .unwrap_or(order.instrument_id());
805
806        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
807            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
808                log::error!("Error deleting order: {e:?}");
809            }
810
811            let emulation_trigger = TriggerType::NoTrigger;
812
813            // Transform order
814            let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
815                order.trader_id(),
816                order.strategy_id(),
817                order.instrument_id(),
818                order.client_order_id(),
819                order.order_side(),
820                order.quantity(),
821                order.price().unwrap(),
822                order.time_in_force(),
823                order.expire_time(),
824                order.is_post_only(),
825                order.is_reduce_only(),
826                order.is_quote_quantity(),
827                order.display_qty(),
828                Some(emulation_trigger),
829                Some(trigger_instrument_id),
830                order.contingency_type(),
831                order.order_list_id(),
832                order.linked_order_ids().map(Vec::from),
833                order.parent_order_id(),
834                order.exec_algorithm_id(),
835                order.exec_algorithm_params().cloned(),
836                order.exec_spawn_id(),
837                order.tags().map(Vec::from),
838                UUID4::new(),
839                self.clock.borrow().timestamp_ns(),
840            ) {
841                transformed
842            } else {
843                log::error!("Cannot create limit order");
844                return;
845            };
846            transformed.liquidity_side = order.liquidity_side();
847
848            // TODO: fix
849            // let triggered_price = order.trigger_price();
850            // if triggered_price.is_some() {
851            //     transformed.trigger_price() = (triggered_price.unwrap());
852            // }
853
854            let original_events = order.events();
855
856            for event in original_events {
857                transformed.events.insert(0, event.clone());
858            }
859
860            if let Err(e) = self.cache.borrow_mut().add_order(
861                OrderAny::Limit(transformed.clone()),
862                command.position_id,
863                Some(command.client_id),
864                true,
865            ) {
866                log::error!("Failed to add order: {e}");
867            }
868
869            // Replace commands order with transformed order
870            command.order = OrderAny::Limit(transformed.clone());
871
872            msgbus::publish(
873                format!("events.order.{}", order.strategy_id()).into(),
874                transformed.last_event(),
875            );
876
877            // Determine triggered price
878            let released_price = match order.order_side() {
879                OrderSide::Buy => matching_core.ask,
880                OrderSide::Sell => matching_core.bid,
881                _ => panic!("invalid `OrderSide`"),
882            };
883
884            // Generate event
885            let event = OrderReleased::new(
886                order.trader_id(),
887                order.strategy_id(),
888                order.instrument_id(),
889                order.client_order_id(),
890                released_price.unwrap(),
891                UUID4::new(),
892                self.clock.borrow().timestamp_ns(),
893                self.clock.borrow().timestamp_ns(),
894            );
895
896            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
897                log::error!("Failed to apply order event: {e}");
898            }
899
900            if let Err(e) = self
901                .cache
902                .borrow_mut()
903                .update_order(&OrderAny::Limit(transformed.clone()))
904            {
905                log::error!("Failed to update order: {e}");
906            }
907
908            self.manager.send_risk_event(OrderEventAny::Released(event));
909
910            log::info!("Releasing order {}", order.client_order_id());
911
912            // Publish event
913            msgbus::publish(
914                format!("events.order.{}", transformed.strategy_id()).into(),
915                &OrderEventAny::Released(event),
916            );
917
918            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
919                self.manager.send_algo_command(command, exec_algorithm_id);
920            } else {
921                self.manager
922                    .send_exec_command(TradingCommand::SubmitOrder(command));
923            }
924        } else {
925            log::error!(
926                "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
927            );
928        }
929    }
930
931    /// # Panics
932    ///
933    /// Panics if a market order command is missing.
934    pub fn fill_market_order(&mut self, order: &mut OrderAny) {
935        // Fetch command
936        let mut command = match self
937            .manager
938            .pop_submit_order_command(order.client_order_id())
939        {
940            Some(command) => command,
941            None => panic!("invalid operation `_fill_market_order` with no command"),
942        };
943
944        let trigger_instrument_id = order
945            .trigger_instrument_id()
946            .unwrap_or(order.instrument_id());
947
948        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
949            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
950                log::error!("Cannot delete order: {e:?}");
951            }
952
953            order.set_emulation_trigger(Some(TriggerType::NoTrigger));
954
955            // Transform order
956            let mut transformed = MarketOrder::new(
957                order.trader_id(),
958                order.strategy_id(),
959                order.instrument_id(),
960                order.client_order_id(),
961                order.order_side(),
962                order.quantity(),
963                order.time_in_force(),
964                UUID4::new(),
965                self.clock.borrow().timestamp_ns(),
966                order.is_reduce_only(),
967                order.is_quote_quantity(),
968                order.contingency_type(),
969                order.order_list_id(),
970                order.linked_order_ids().map(Vec::from),
971                order.parent_order_id(),
972                order.exec_algorithm_id(),
973                order.exec_algorithm_params().cloned(),
974                order.exec_spawn_id(),
975                order.tags().map(Vec::from),
976            );
977
978            let original_events = order.events();
979
980            for event in original_events {
981                transformed.events.insert(0, event.clone());
982            }
983
984            if let Err(e) = self.cache.borrow_mut().add_order(
985                OrderAny::Market(transformed.clone()),
986                command.position_id,
987                Some(command.client_id),
988                true,
989            ) {
990                log::error!("Failed to add order: {e}");
991            }
992
993            // Replace commands order with transformed order
994            command.order = OrderAny::Market(transformed.clone());
995
996            msgbus::publish(
997                format!("events.order.{}", order.strategy_id()).into(),
998                transformed.last_event(),
999            );
1000
1001            // Determine triggered price
1002            // TODO: fix unwraps
1003            let released_price = match order.order_side() {
1004                OrderSide::Buy => matching_core.ask,
1005                OrderSide::Sell => matching_core.bid,
1006                _ => panic!("invalid `OrderSide`"),
1007            };
1008
1009            // Generate event
1010            let ts_now = self.clock.borrow().timestamp_ns();
1011            let event = OrderReleased::new(
1012                order.trader_id(),
1013                order.strategy_id(),
1014                order.instrument_id(),
1015                order.client_order_id(),
1016                released_price.unwrap(),
1017                UUID4::new(),
1018                ts_now,
1019                ts_now,
1020            );
1021
1022            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1023                log::error!("Failed to apply order event: {e}");
1024            }
1025
1026            if let Err(e) = self
1027                .cache
1028                .borrow_mut()
1029                .update_order(&OrderAny::Market(transformed))
1030            {
1031                log::error!("Failed to update order: {e}");
1032            }
1033            self.manager.send_risk_event(OrderEventAny::Released(event));
1034
1035            log::info!("Releasing order {}", order.client_order_id());
1036
1037            // Publish event
1038            msgbus::publish(
1039                format!("events.order.{}", order.strategy_id()).into(),
1040                &OrderEventAny::Released(event),
1041            );
1042
1043            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1044                self.manager.send_algo_command(command, exec_algorithm_id);
1045            } else {
1046                self.manager
1047                    .send_exec_command(TradingCommand::SubmitOrder(command));
1048            }
1049        } else {
1050            log::error!(
1051                "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
1052            );
1053        }
1054    }
1055
1056    #[allow(clippy::too_many_lines)]
1057    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1058        let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1059            log::error!(
1060                "Cannot update trailing-stop order: no matching core for instrument {}",
1061                order.instrument_id()
1062            );
1063            return;
1064        };
1065
1066        let mut bid = matching_core.bid;
1067        let mut ask = matching_core.ask;
1068        let mut last = matching_core.last;
1069
1070        if bid.is_none() || ask.is_none() || last.is_none() {
1071            if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1072                bid.get_or_insert(q.bid_price);
1073                ask.get_or_insert(q.ask_price);
1074            }
1075            if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1076                last.get_or_insert(t.price);
1077            }
1078        }
1079
1080        let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1081            matching_core.price_increment,
1082            order.trigger_price(),
1083            order.activation_price(),
1084            order,
1085            bid,
1086            ask,
1087            last,
1088        ) {
1089            Ok(pair) => pair,
1090            Err(e) => {
1091                log::warn!("Cannot calculate trailing-stop update: {e}");
1092                return;
1093            }
1094        };
1095
1096        if new_trigger_px.is_none() && new_limit_px.is_none() {
1097            return;
1098        }
1099
1100        let ts_now = self.clock.borrow().timestamp_ns();
1101        let update = OrderUpdated::new(
1102            order.trader_id(),
1103            order.strategy_id(),
1104            order.instrument_id(),
1105            order.client_order_id(),
1106            order.quantity(),
1107            UUID4::new(),
1108            ts_now,
1109            ts_now,
1110            false,
1111            order.venue_order_id(),
1112            order.account_id(),
1113            new_limit_px,
1114            new_trigger_px,
1115        );
1116        let wrapped = OrderEventAny::Updated(update);
1117        if let Err(e) = order.apply(wrapped.clone()) {
1118            log::error!("Failed to apply order event: {e}");
1119            return;
1120        }
1121        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1122            log::error!("Failed to update order in cache: {e}");
1123            return;
1124        }
1125        self.manager.send_risk_event(wrapped);
1126    }
1127}