nautilus_execution/order_emulator/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    collections::{HashMap, HashSet},
19    fmt::Debug,
20    rc::Rc,
21};
22
23use nautilus_common::{
24    cache::Cache,
25    clock::Clock,
26    logging::{CMD, EVT, RECV},
27    messages::execution::{
28        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
29    },
30    msgbus::{self, handler::ShareableMessageHandler},
31};
32use nautilus_core::UUID4;
33use nautilus_model::{
34    data::{OrderBookDeltas, QuoteTick, TradeTick},
35    enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
36    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
37    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
38    instruments::Instrument,
39    orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
40    types::{Price, Quantity},
41};
42
43use crate::{
44    matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
45    trailing::trailing_stop_calculate,
46};
47
48pub struct OrderEmulator {
49    clock: Rc<RefCell<dyn Clock>>,
50    cache: Rc<RefCell<Cache>>,
51    manager: OrderManager,
52    matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
53    subscribed_quotes: HashSet<InstrumentId>,
54    subscribed_trades: HashSet<InstrumentId>,
55    subscribed_strategies: HashSet<StrategyId>,
56    monitored_positions: HashSet<PositionId>,
57    on_event_handler: Option<ShareableMessageHandler>,
58}
59
60impl Debug for OrderEmulator {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct(stringify!(OrderEmulator))
63            .field("cores", &self.matching_cores.len())
64            .field("subscribed_quotes", &self.subscribed_quotes.len())
65            .finish()
66    }
67}
68
69impl OrderEmulator {
70    pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
71        // TODO: Impl Actor Trait
72        // self.register_base(portfolio, msgbus, cache, clock);
73
74        let active_local = true;
75        let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
76
77        Self {
78            clock,
79            cache,
80            manager,
81            matching_cores: HashMap::new(),
82            subscribed_quotes: HashSet::new(),
83            subscribed_trades: HashSet::new(),
84            subscribed_strategies: HashSet::new(),
85            monitored_positions: HashSet::new(),
86            on_event_handler: None,
87        }
88    }
89
90    pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
91        self.on_event_handler = Some(handler);
92    }
93
94    // TODO: WIP
95    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
96    //     self.manager.set_submit_order_handler(handler);
97    // }
98    //
99    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
100    //     self.manager.set_cancel_order_handler(handler);
101    // }
102    //
103    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
104    //     self.manager.set_modify_order_handler(handler);
105    // }
106
107    #[must_use]
108    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
109        let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
110        quotes.sort();
111        quotes
112    }
113
114    #[must_use]
115    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
116        let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
117        trades.sort();
118        trades
119    }
120
121    #[must_use]
122    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
123        self.manager.get_submit_order_commands()
124    }
125
126    #[must_use]
127    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
128        self.matching_cores.get(instrument_id).cloned()
129    }
130
131    /// Reactivates emulated orders from cache on start.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if no emulated orders are found or processing fails.
136    ///
137    /// # Panics
138    ///
139    /// Panics if a cached client ID cannot be unwrapped.
140    pub fn on_start(&mut self) -> anyhow::Result<()> {
141        let emulated_orders: Vec<OrderAny> = self
142            .cache
143            .borrow()
144            .orders_emulated(None, None, None, None)
145            .into_iter()
146            .cloned()
147            .collect();
148
149        if emulated_orders.is_empty() {
150            log::error!("No emulated orders to reactivate");
151            return Ok(());
152        }
153
154        for order in emulated_orders {
155            if !matches!(
156                order.status(),
157                OrderStatus::Initialized | OrderStatus::Emulated
158            ) {
159                continue; // No longer emulated
160            }
161
162            if let Some(parent_order_id) = &order.parent_order_id() {
163                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
164                    order.clone()
165                } else {
166                    log::error!("Cannot handle order: parent {parent_order_id} not found");
167                    continue;
168                };
169
170                let is_position_closed = parent_order
171                    .position_id()
172                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
173                if parent_order.is_closed() && is_position_closed {
174                    self.manager.cancel_order(&order);
175                    continue; // Parent already closed
176                }
177
178                if parent_order.contingency_type() == Some(ContingencyType::Oto)
179                    && (parent_order.is_active_local()
180                        || parent_order.filled_qty() == Quantity::zero(0))
181                {
182                    continue; // Process contingency order later once parent triggered
183                }
184            }
185
186            let position_id = self
187                .cache
188                .borrow()
189                .position_id(&order.client_order_id())
190                .copied();
191            let client_id = self
192                .cache
193                .borrow()
194                .client_id(&order.client_order_id())
195                .copied();
196
197            let command = SubmitOrder::new(
198                order.trader_id(),
199                client_id.unwrap(),
200                order.strategy_id(),
201                order.instrument_id(),
202                order.client_order_id(),
203                order.venue_order_id().unwrap(),
204                order.clone(),
205                order.exec_algorithm_id(),
206                position_id,
207                None, // params
208                UUID4::new(),
209                self.clock.borrow().timestamp_ns(),
210            )?;
211
212            self.handle_submit_order(command);
213        }
214
215        Ok(())
216    }
217
218    /// # Panics
219    ///
220    /// Panics if the order cannot be converted to a passive order.
221    pub fn on_event(&mut self, event: OrderEventAny) {
222        log::info!("{RECV}{EVT} {event}");
223
224        self.manager.handle_event(event.clone());
225
226        if let Some(order) = self.cache.borrow().order(&event.client_order_id())
227            && order.is_closed()
228            && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
229            && let Err(e) = matching_core.delete_order(
230                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
231            )
232        {
233            log::error!("Error deleting order: {e}");
234        }
235        // else: Order not in cache yet
236    }
237
238    pub const fn on_stop(&self) {}
239
240    pub fn on_reset(&mut self) {
241        self.manager.reset();
242        self.matching_cores.clear();
243    }
244
245    pub const fn on_dispose(&self) {}
246
247    pub fn execute(&mut self, command: TradingCommand) {
248        log::info!("{RECV}{CMD} {command}");
249
250        match command {
251            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
252            TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
253            TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
254            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
255            TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
256            _ => log::error!("Cannot handle command: unrecognized {command:?}"),
257        }
258    }
259
260    fn create_matching_core(
261        &mut self,
262        instrument_id: InstrumentId,
263        price_increment: Price,
264    ) -> OrderMatchingCore {
265        let matching_core =
266            OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
267        self.matching_cores
268            .insert(instrument_id, matching_core.clone());
269        log::info!("Creating matching core for {instrument_id:?}");
270        matching_core
271    }
272
273    /// # Panics
274    ///
275    /// Panics if the emulation trigger type is `NoTrigger`.
276    pub fn handle_submit_order(&mut self, command: SubmitOrder) {
277        let mut order = command.order.clone();
278        let emulation_trigger = order.emulation_trigger();
279
280        assert_ne!(
281            emulation_trigger,
282            Some(TriggerType::NoTrigger),
283            "command.order.emulation_trigger must not be TriggerType::NoTrigger"
284        );
285        assert!(
286            self.manager
287                .get_submit_order_commands()
288                .contains_key(&order.client_order_id()),
289            "command.order.client_order_id must be in submit_order_commands"
290        );
291
292        if !matches!(
293            emulation_trigger,
294            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
295        ) {
296            log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
297            self.manager.cancel_order(&order);
298            return;
299        }
300
301        self.check_monitoring(command.strategy_id, command.position_id);
302
303        // Get or create matching core
304        let trigger_instrument_id = order
305            .trigger_instrument_id()
306            .unwrap_or_else(|| order.instrument_id());
307
308        let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
309
310        let mut matching_core = if let Some(core) = matching_core {
311            core
312        } else {
313            // Handle synthetic instruments
314            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
315                let synthetic = self
316                    .cache
317                    .borrow()
318                    .synthetic(&trigger_instrument_id)
319                    .cloned();
320                if let Some(synthetic) = synthetic {
321                    (synthetic.id, synthetic.price_increment)
322                } else {
323                    log::error!(
324                        "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
325                    );
326                    self.manager.cancel_order(&order);
327                    return;
328                }
329            } else {
330                let instrument = self
331                    .cache
332                    .borrow()
333                    .instrument(&trigger_instrument_id)
334                    .cloned();
335                if let Some(instrument) = instrument {
336                    (instrument.id(), instrument.price_increment())
337                } else {
338                    log::error!(
339                        "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
340                    );
341                    self.manager.cancel_order(&order);
342                    return;
343                }
344            };
345
346            self.create_matching_core(instrument_id, price_increment)
347        };
348
349        // Update trailing stop
350        if matches!(
351            order.order_type(),
352            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
353        ) {
354            self.update_trailing_stop_order(&mut order);
355            if order.trigger_price().is_none() {
356                log::error!(
357                    "Cannot handle trailing stop order with no trigger_price and no market updates"
358                );
359
360                self.manager.cancel_order(&order);
361                return;
362            }
363        }
364
365        // Cache command
366        self.manager.cache_submit_order_command(command);
367
368        // Check if immediately marketable
369        matching_core.match_order(
370            &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
371            true,
372        );
373
374        // Handle data subscriptions
375        match emulation_trigger.unwrap() {
376            TriggerType::Default | TriggerType::BidAsk => {
377                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
378                    if !trigger_instrument_id.is_synthetic() {
379                        // TODO: Impl Actor Trait
380                        // self.subscribe_order_book_deltas(&trigger_instrument_id);
381                    }
382                    // TODO: Impl Actor Trait
383                    // self.subscribe_quote_ticks(&trigger_instrument_id)?;
384                    self.subscribed_quotes.insert(trigger_instrument_id);
385                }
386            }
387            TriggerType::LastPrice => {
388                if !self.subscribed_trades.contains(&trigger_instrument_id) {
389                    // TODO: Impl Actor Trait
390                    // self.subscribe_trade_ticks(&trigger_instrument_id)?;
391                    self.subscribed_trades.insert(trigger_instrument_id);
392                }
393            }
394            _ => {
395                log::error!("Invalid TriggerType: {emulation_trigger:?}");
396                return;
397            }
398        }
399
400        // Check if order was already released
401        if !self
402            .manager
403            .get_submit_order_commands()
404            .contains_key(&order.client_order_id())
405        {
406            return; // Already released
407        }
408
409        // Hold in matching core
410        if let Err(e) = matching_core
411            .add_order(PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"))
412        {
413            log::error!("Cannot add order: {e:?}");
414            return;
415        }
416
417        // Generate emulated event if needed
418        if order.status() == OrderStatus::Initialized {
419            let event = OrderEmulated::new(
420                order.trader_id(),
421                order.strategy_id(),
422                order.instrument_id(),
423                order.client_order_id(),
424                UUID4::new(),
425                self.clock.borrow().timestamp_ns(),
426                self.clock.borrow().timestamp_ns(),
427            );
428
429            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
430                log::error!("Cannot apply order event: {e:?}");
431                return;
432            }
433
434            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
435                log::error!("Cannot update order: {e:?}");
436                return;
437            }
438
439            self.manager.send_risk_event(OrderEventAny::Emulated(event));
440
441            msgbus::publish(
442                format!("events.order.{}", order.strategy_id()).into(),
443                &OrderEventAny::Emulated(event),
444            );
445        }
446
447        // Since we are cloning the matching core, we need to insert it back into the original hashmap
448        self.matching_cores
449            .insert(trigger_instrument_id, matching_core);
450
451        log::info!("Emulating {order}");
452    }
453
454    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
455        self.check_monitoring(command.strategy_id, command.position_id);
456
457        for order in &command.order_list.orders {
458            if let Some(parent_order_id) = order.parent_order_id() {
459                let cache = self.cache.borrow();
460                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
461                    parent_order
462                } else {
463                    log::error!("Parent order for {} not found", order.client_order_id());
464                    continue;
465                };
466
467                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
468                    continue; // Process contingency order later once parent triggered
469                }
470            }
471
472            if let Err(e) = self.manager.create_new_submit_order(
473                order,
474                command.position_id,
475                Some(command.client_id),
476            ) {
477                log::error!("Error creating new submit order: {e}");
478            }
479        }
480    }
481
482    fn handle_modify_order(&mut self, command: ModifyOrder) {
483        if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
484            let price = match command.price {
485                Some(price) => Some(price),
486                None => order.price(),
487            };
488
489            let trigger_price = match command.trigger_price {
490                Some(trigger_price) => Some(trigger_price),
491                None => order.trigger_price(),
492            };
493
494            // Generate event
495            let ts_now = self.clock.borrow().timestamp_ns();
496            let event = OrderUpdated::new(
497                order.trader_id(),
498                order.strategy_id(),
499                order.instrument_id(),
500                order.client_order_id(),
501                command.quantity.unwrap_or(order.quantity()),
502                UUID4::new(),
503                ts_now,
504                ts_now,
505                false,
506                order.venue_order_id(),
507                order.account_id(),
508                price,
509                trigger_price,
510                None,
511            );
512
513            self.manager.send_exec_event(OrderEventAny::Updated(event));
514
515            let trigger_instrument_id = order
516                .trigger_instrument_id()
517                .unwrap_or_else(|| order.instrument_id());
518
519            if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
520                matching_core.match_order(
521                    &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
522                    false,
523                );
524            } else {
525                log::error!(
526                    "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
527                );
528            }
529        } else {
530            log::error!("Cannot modify order: {} not found", command.client_order_id);
531        }
532    }
533
534    pub fn handle_cancel_order(&mut self, command: CancelOrder) {
535        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
536            order.clone()
537        } else {
538            log::error!("Cannot cancel order: {} not found", command.client_order_id);
539            return;
540        };
541
542        let trigger_instrument_id = order
543            .trigger_instrument_id()
544            .unwrap_or_else(|| order.instrument_id());
545
546        let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
547            core
548        } else {
549            self.manager.cancel_order(&order);
550            return;
551        };
552
553        if !matching_core.order_exists(order.client_order_id())
554            && order.is_open()
555            && !order.is_pending_cancel()
556        {
557            // Order not held in the emulator
558            self.manager
559                .send_exec_command(TradingCommand::CancelOrder(command));
560        } else {
561            self.manager.cancel_order(&order);
562        }
563    }
564
565    fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
566        let instrument_id = command.instrument_id;
567        let matching_core = match self.matching_cores.get(&instrument_id) {
568            Some(core) => core,
569            None => return, // No orders to cancel
570        };
571
572        let orders_to_cancel = match command.order_side {
573            OrderSide::NoOrderSide => {
574                // Get both bid and ask orders
575                let mut all_orders = Vec::new();
576                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
577                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
578                all_orders
579            }
580            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
581            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
582        };
583
584        // Process all orders in a single iteration
585        for order in orders_to_cancel {
586            self.manager.cancel_order(&OrderAny::from(order));
587        }
588    }
589
590    // --------------------------------------------------------------------------------------------
591
592    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
593        log::info!(
594            "Updating order {} quantity to {new_quantity}",
595            order.client_order_id(),
596        );
597
598        // Generate event
599        let ts_now = self.clock.borrow().timestamp_ns();
600        let event = OrderUpdated::new(
601            order.trader_id(),
602            order.strategy_id(),
603            order.instrument_id(),
604            order.client_order_id(),
605            new_quantity,
606            UUID4::new(),
607            ts_now,
608            ts_now,
609            false,
610            None,
611            order.account_id(),
612            None,
613            None,
614            None,
615        );
616
617        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
618            log::error!("Cannot apply order event: {e:?}");
619            return;
620        }
621        if let Err(e) = self.cache.borrow_mut().update_order(order) {
622            log::error!("Cannot update order: {e:?}");
623            return;
624        }
625
626        self.manager.send_risk_event(OrderEventAny::Updated(event));
627    }
628
629    // -----------------------------------------------------------------------------------------------
630
631    pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
632        log::debug!("Processing {deltas:?}");
633
634        let instrument_id = &deltas.instrument_id;
635        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
636            if let Some(book) = self.cache.borrow().order_book(instrument_id) {
637                let best_bid = book.best_bid_price();
638                let best_ask = book.best_ask_price();
639
640                if let Some(best_bid) = best_bid {
641                    matching_core.set_bid_raw(best_bid);
642                }
643
644                if let Some(best_ask) = best_ask {
645                    matching_core.set_ask_raw(best_ask);
646                }
647            } else {
648                log::error!(
649                    "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
650                    deltas.instrument_id
651                );
652            }
653
654            self.iterate_orders(instrument_id);
655        } else {
656            log::error!(
657                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
658                deltas.instrument_id
659            );
660        }
661    }
662
663    pub fn on_quote_tick(&mut self, quote: QuoteTick) {
664        log::debug!("Processing {quote}:?");
665
666        let instrument_id = &quote.instrument_id;
667        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
668            matching_core.set_bid_raw(quote.bid_price);
669            matching_core.set_ask_raw(quote.ask_price);
670
671            self.iterate_orders(instrument_id);
672        } else {
673            log::error!(
674                "Cannot handle `QuoteTick`: no matching core for instrument {}",
675                quote.instrument_id
676            );
677        }
678    }
679
680    pub fn on_trade_tick(&mut self, trade: TradeTick) {
681        log::debug!("Processing {trade:?}");
682
683        let instrument_id = &trade.instrument_id;
684        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
685            matching_core.set_last_raw(trade.price);
686            if !self.subscribed_quotes.contains(instrument_id) {
687                matching_core.set_bid_raw(trade.price);
688                matching_core.set_ask_raw(trade.price);
689            }
690
691            self.iterate_orders(instrument_id);
692        } else {
693            log::error!(
694                "Cannot handle `TradeTick`: no matching core for instrument {}",
695                trade.instrument_id
696            );
697        }
698    }
699
700    fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
701        let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
702            matching_core.iterate();
703
704            matching_core.get_orders()
705        } else {
706            log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
707            return;
708        };
709
710        for order in orders {
711            if order.is_closed() {
712                continue;
713            }
714
715            let mut order: OrderAny = order.clone().into();
716            if matches!(
717                order.order_type(),
718                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
719            ) {
720                self.update_trailing_stop_order(&mut order);
721            }
722        }
723    }
724
725    /// # Panics
726    ///
727    /// Panics if the order cannot be converted to a passive order.
728    pub fn cancel_order(&mut self, order: &OrderAny) {
729        log::info!("Canceling order {}", order.client_order_id());
730
731        let mut order = order.clone();
732        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
733
734        let trigger_instrument_id = order
735            .trigger_instrument_id()
736            .unwrap_or(order.instrument_id());
737
738        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
739            && let Err(e) = matching_core.delete_order(
740                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
741            )
742        {
743            log::error!("Cannot delete order: {e:?}");
744        }
745
746        self.cache
747            .borrow_mut()
748            .update_order_pending_cancel_local(&order);
749
750        // Generate event
751        let ts_now = self.clock.borrow().timestamp_ns();
752        let event = OrderCanceled::new(
753            order.trader_id(),
754            order.strategy_id(),
755            order.instrument_id(),
756            order.client_order_id(),
757            UUID4::new(),
758            ts_now,
759            ts_now,
760            false,
761            order.venue_order_id(),
762            order.account_id(),
763        );
764
765        self.manager.send_exec_event(OrderEventAny::Canceled(event));
766    }
767
768    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
769        if !self.subscribed_strategies.contains(&strategy_id) {
770            // Subscribe to all strategy events
771            if let Some(handler) = &self.on_event_handler {
772                msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
773                msgbus::subscribe_str(
774                    format!("events.position.{strategy_id}"),
775                    handler.clone(),
776                    None,
777                );
778                self.subscribed_strategies.insert(strategy_id);
779                log::info!("Subscribed to strategy {strategy_id} order and position events");
780            }
781        }
782
783        if let Some(position_id) = position_id
784            && !self.monitored_positions.contains(&position_id)
785        {
786            self.monitored_positions.insert(position_id);
787        }
788    }
789
790    /// Validates market data availability for order release.
791    ///
792    /// Returns `Some(released_price)` if market data is available, `None` otherwise.
793    /// Logs appropriate warnings when market data is not yet available.
794    ///
795    /// Does NOT pop the submit order command - caller must do that and handle missing command
796    /// according to their contract (panic for market orders, return for limit orders).
797    fn validate_release(
798        &self,
799        order: &OrderAny,
800        matching_core: &OrderMatchingCore,
801        trigger_instrument_id: InstrumentId,
802    ) -> Option<Price> {
803        let released_price = match order.order_side_specified() {
804            OrderSideSpecified::Buy => matching_core.ask,
805            OrderSideSpecified::Sell => matching_core.bid,
806        };
807
808        if released_price.is_none() {
809            log::warn!(
810                "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
811                order.client_order_id(),
812            );
813            return None;
814        }
815
816        Some(released_price.unwrap())
817    }
818
819    /// # Panics
820    ///
821    /// Panics if the order type is invalid for a stop order.
822    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
823        match order.order_type() {
824            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
825                self.fill_limit_order(order);
826            }
827            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
828                self.fill_market_order(order);
829            }
830            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
831        }
832    }
833
834    /// # Panics
835    ///
836    /// Panics if a limit order has no price.
837    pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
838        if matches!(order.order_type(), OrderType::Limit) {
839            self.fill_market_order(order);
840            return;
841        }
842
843        let trigger_instrument_id = order
844            .trigger_instrument_id()
845            .unwrap_or(order.instrument_id());
846
847        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
848            Some(core) => core,
849            None => {
850                log::error!(
851                    "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
852                );
853                return; // Order stays queued for retry
854            }
855        };
856
857        let released_price =
858            match self.validate_release(order, matching_core, trigger_instrument_id) {
859                Some(price) => price,
860                None => return, // Order stays queued for retry
861            };
862
863        let mut command = match self
864            .manager
865            .pop_submit_order_command(order.client_order_id())
866        {
867            Some(command) => command,
868            None => return, // Order already released
869        };
870
871        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
872            if let Err(e) = matching_core.delete_order(
873                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
874            ) {
875                log::error!("Error deleting order: {e:?}");
876            }
877
878            let emulation_trigger = TriggerType::NoTrigger;
879
880            // Transform order
881            let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
882                order.trader_id(),
883                order.strategy_id(),
884                order.instrument_id(),
885                order.client_order_id(),
886                order.order_side(),
887                order.quantity(),
888                order.price().unwrap(),
889                order.time_in_force(),
890                order.expire_time(),
891                order.is_post_only(),
892                order.is_reduce_only(),
893                order.is_quote_quantity(),
894                order.display_qty(),
895                Some(emulation_trigger),
896                Some(trigger_instrument_id),
897                order.contingency_type(),
898                order.order_list_id(),
899                order.linked_order_ids().map(Vec::from),
900                order.parent_order_id(),
901                order.exec_algorithm_id(),
902                order.exec_algorithm_params().cloned(),
903                order.exec_spawn_id(),
904                order.tags().map(Vec::from),
905                UUID4::new(),
906                self.clock.borrow().timestamp_ns(),
907            ) {
908                transformed
909            } else {
910                log::error!("Cannot create limit order");
911                return;
912            };
913            transformed.liquidity_side = order.liquidity_side();
914
915            // TODO: fix
916            // let triggered_price = order.trigger_price();
917            // if triggered_price.is_some() {
918            //     transformed.trigger_price() = (triggered_price.unwrap());
919            // }
920
921            let original_events = order.events();
922
923            for event in original_events {
924                transformed.events.insert(0, event.clone());
925            }
926
927            if let Err(e) = self.cache.borrow_mut().add_order(
928                OrderAny::Limit(transformed.clone()),
929                command.position_id,
930                Some(command.client_id),
931                true,
932            ) {
933                log::error!("Failed to add order: {e}");
934            }
935
936            // Replace commands order with transformed order
937            command.order = OrderAny::Limit(transformed.clone());
938
939            msgbus::publish(
940                format!("events.order.{}", order.strategy_id()).into(),
941                transformed.last_event(),
942            );
943
944            // Generate event
945            let event = OrderReleased::new(
946                order.trader_id(),
947                order.strategy_id(),
948                order.instrument_id(),
949                order.client_order_id(),
950                released_price,
951                UUID4::new(),
952                self.clock.borrow().timestamp_ns(),
953                self.clock.borrow().timestamp_ns(),
954            );
955
956            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
957                log::error!("Failed to apply order event: {e}");
958            }
959
960            if let Err(e) = self
961                .cache
962                .borrow_mut()
963                .update_order(&OrderAny::Limit(transformed.clone()))
964            {
965                log::error!("Failed to update order: {e}");
966            }
967
968            self.manager.send_risk_event(OrderEventAny::Released(event));
969
970            log::info!("Releasing order {}", order.client_order_id());
971
972            // Publish event
973            msgbus::publish(
974                format!("events.order.{}", transformed.strategy_id()).into(),
975                &OrderEventAny::Released(event),
976            );
977
978            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
979                self.manager.send_algo_command(command, exec_algorithm_id);
980            } else {
981                self.manager
982                    .send_exec_command(TradingCommand::SubmitOrder(command));
983            }
984        }
985    }
986
987    /// # Panics
988    ///
989    /// Panics if a market order command is missing.
990    pub fn fill_market_order(&mut self, order: &mut OrderAny) {
991        let trigger_instrument_id = order
992            .trigger_instrument_id()
993            .unwrap_or(order.instrument_id());
994
995        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
996            Some(core) => core,
997            None => {
998                log::error!(
999                    "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
1000                );
1001                return; // Order stays queued for retry
1002            }
1003        };
1004
1005        let released_price =
1006            match self.validate_release(order, matching_core, trigger_instrument_id) {
1007                Some(price) => price,
1008                None => return, // Order stays queued for retry
1009            };
1010
1011        let mut command = self
1012            .manager
1013            .pop_submit_order_command(order.client_order_id())
1014            .expect("invalid operation `fill_market_order` with no command");
1015
1016        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1017            if let Err(e) = matching_core.delete_order(
1018                &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
1019            ) {
1020                log::error!("Cannot delete order: {e:?}");
1021            }
1022
1023            order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1024
1025            // Transform order
1026            let mut transformed = MarketOrder::new(
1027                order.trader_id(),
1028                order.strategy_id(),
1029                order.instrument_id(),
1030                order.client_order_id(),
1031                order.order_side(),
1032                order.quantity(),
1033                order.time_in_force(),
1034                UUID4::new(),
1035                self.clock.borrow().timestamp_ns(),
1036                order.is_reduce_only(),
1037                order.is_quote_quantity(),
1038                order.contingency_type(),
1039                order.order_list_id(),
1040                order.linked_order_ids().map(Vec::from),
1041                order.parent_order_id(),
1042                order.exec_algorithm_id(),
1043                order.exec_algorithm_params().cloned(),
1044                order.exec_spawn_id(),
1045                order.tags().map(Vec::from),
1046            );
1047
1048            let original_events = order.events();
1049
1050            for event in original_events {
1051                transformed.events.insert(0, event.clone());
1052            }
1053
1054            if let Err(e) = self.cache.borrow_mut().add_order(
1055                OrderAny::Market(transformed.clone()),
1056                command.position_id,
1057                Some(command.client_id),
1058                true,
1059            ) {
1060                log::error!("Failed to add order: {e}");
1061            }
1062
1063            // Replace commands order with transformed order
1064            command.order = OrderAny::Market(transformed.clone());
1065
1066            msgbus::publish(
1067                format!("events.order.{}", order.strategy_id()).into(),
1068                transformed.last_event(),
1069            );
1070
1071            // Generate event
1072            let ts_now = self.clock.borrow().timestamp_ns();
1073            let event = OrderReleased::new(
1074                order.trader_id(),
1075                order.strategy_id(),
1076                order.instrument_id(),
1077                order.client_order_id(),
1078                released_price,
1079                UUID4::new(),
1080                ts_now,
1081                ts_now,
1082            );
1083
1084            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1085                log::error!("Failed to apply order event: {e}");
1086            }
1087
1088            if let Err(e) = self
1089                .cache
1090                .borrow_mut()
1091                .update_order(&OrderAny::Market(transformed))
1092            {
1093                log::error!("Failed to update order: {e}");
1094            }
1095            self.manager.send_risk_event(OrderEventAny::Released(event));
1096
1097            log::info!("Releasing order {}", order.client_order_id());
1098
1099            // Publish event
1100            msgbus::publish(
1101                format!("events.order.{}", order.strategy_id()).into(),
1102                &OrderEventAny::Released(event),
1103            );
1104
1105            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1106                self.manager.send_algo_command(command, exec_algorithm_id);
1107            } else {
1108                self.manager
1109                    .send_exec_command(TradingCommand::SubmitOrder(command));
1110            }
1111        }
1112    }
1113
1114    #[allow(clippy::too_many_lines)]
1115    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1116        let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1117            log::error!(
1118                "Cannot update trailing-stop order: no matching core for instrument {}",
1119                order.instrument_id()
1120            );
1121            return;
1122        };
1123
1124        let mut bid = matching_core.bid;
1125        let mut ask = matching_core.ask;
1126        let mut last = matching_core.last;
1127
1128        if bid.is_none() || ask.is_none() || last.is_none() {
1129            if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1130                bid.get_or_insert(q.bid_price);
1131                ask.get_or_insert(q.ask_price);
1132            }
1133            if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1134                last.get_or_insert(t.price);
1135            }
1136        }
1137
1138        let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1139            matching_core.price_increment,
1140            order.trigger_price(),
1141            order.activation_price(),
1142            order,
1143            bid,
1144            ask,
1145            last,
1146        ) {
1147            Ok(pair) => pair,
1148            Err(e) => {
1149                log::warn!("Cannot calculate trailing-stop update: {e}");
1150                return;
1151            }
1152        };
1153
1154        if new_trigger_px.is_none() && new_limit_px.is_none() {
1155            return;
1156        }
1157
1158        let ts_now = self.clock.borrow().timestamp_ns();
1159        let update = OrderUpdated::new(
1160            order.trader_id(),
1161            order.strategy_id(),
1162            order.instrument_id(),
1163            order.client_order_id(),
1164            order.quantity(),
1165            UUID4::new(),
1166            ts_now,
1167            ts_now,
1168            false,
1169            order.venue_order_id(),
1170            order.account_id(),
1171            new_limit_px,
1172            new_trigger_px,
1173            None,
1174        );
1175        let wrapped = OrderEventAny::Updated(update);
1176        if let Err(e) = order.apply(wrapped.clone()) {
1177            log::error!("Failed to apply order event: {e}");
1178            return;
1179        }
1180        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1181            log::error!("Failed to update order in cache: {e}");
1182            return;
1183        }
1184        self.manager.send_risk_event(wrapped);
1185    }
1186}