nautilus_execution/order_emulator/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    collections::{HashMap, HashSet},
19    rc::Rc,
20};
21
22use anyhow::Result;
23use nautilus_common::{
24    cache::Cache,
25    clock::Clock,
26    logging::{CMD, EVT, RECV},
27    msgbus::{MessageBus, handler::ShareableMessageHandler},
28};
29use nautilus_core::uuid::UUID4;
30use nautilus_model::{
31    data::{OrderBookDeltas, QuoteTick, TradeTick},
32    enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
33    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
34    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
35    orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
36    types::{Price, Quantity},
37};
38
39use crate::{
40    matching_core::OrderMatchingCore,
41    messages::{
42        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
43        cancel::CancelOrderHandlerAny, modify::ModifyOrderHandlerAny,
44        submit::SubmitOrderHandlerAny,
45    },
46    order_manager::manager::OrderManager,
47    trailing::trailing_stop_calculate,
48};
49
50pub struct OrderEmulator {
51    clock: Rc<RefCell<dyn Clock>>,
52    cache: Rc<RefCell<Cache>>,
53    msgbus: Rc<RefCell<MessageBus>>,
54    manager: OrderManager,
55    matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
56    subscribed_quotes: HashSet<InstrumentId>,
57    subscribed_trades: HashSet<InstrumentId>,
58    subscribed_strategies: HashSet<StrategyId>,
59    monitored_positions: HashSet<PositionId>,
60    on_event_handler: Option<ShareableMessageHandler>,
61}
62
63impl OrderEmulator {
64    pub fn new(
65        clock: Rc<RefCell<dyn Clock>>,
66        cache: Rc<RefCell<Cache>>,
67        msgbus: Rc<RefCell<MessageBus>>,
68    ) -> Self {
69        // TODO: Impl Actor Trait
70        // self.register_base(portfolio, msgbus, cache, clock);
71
72        let active_local = true;
73        let manager = OrderManager::new(
74            clock.clone(),
75            msgbus.clone(),
76            cache.clone(),
77            active_local,
78            None,
79            None,
80            None,
81        );
82
83        Self {
84            clock,
85            cache,
86            msgbus,
87            manager,
88            matching_cores: HashMap::new(),
89            subscribed_quotes: HashSet::new(),
90            subscribed_trades: HashSet::new(),
91            subscribed_strategies: HashSet::new(),
92            monitored_positions: HashSet::new(),
93            on_event_handler: None,
94        }
95    }
96
97    pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
98        self.on_event_handler = Some(handler);
99    }
100
101    pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
102        self.manager.set_submit_order_handler(handler);
103    }
104
105    pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
106        self.manager.set_cancel_order_handler(handler);
107    }
108
109    pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
110        self.manager.set_modify_order_handler(handler);
111    }
112
113    #[must_use]
114    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
115        let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
116        quotes.sort();
117        quotes
118    }
119
120    #[must_use]
121    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
122        let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
123        trades.sort();
124        trades
125    }
126
127    #[must_use]
128    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
129        self.manager.get_submit_order_commands()
130    }
131
132    #[must_use]
133    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
134        self.matching_cores.get(instrument_id).cloned()
135    }
136
137    pub fn on_start(&mut self) -> Result<()> {
138        let emulated_orders: Vec<OrderAny> = self
139            .cache
140            .borrow()
141            .orders_emulated(None, None, None, None)
142            .into_iter()
143            .cloned()
144            .collect();
145
146        if emulated_orders.is_empty() {
147            log::error!("No emulated orders to reactivate");
148            return Ok(());
149        }
150
151        for order in emulated_orders {
152            if !matches!(
153                order.status(),
154                OrderStatus::Initialized | OrderStatus::Emulated
155            ) {
156                continue; // No longer emulated
157            }
158
159            if let Some(parent_order_id) = &order.parent_order_id() {
160                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
161                    order.clone()
162                } else {
163                    log::error!("Cannot handle order: parent {parent_order_id} not found");
164                    continue;
165                };
166
167                let is_position_closed = parent_order
168                    .position_id()
169                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
170                if parent_order.is_closed() && is_position_closed {
171                    self.manager.cancel_order(&order);
172                    continue; // Parent already closed
173                }
174
175                if parent_order.contingency_type() == Some(ContingencyType::Oto)
176                    && (parent_order.is_active_local()
177                        || parent_order.filled_qty() == Quantity::zero(0))
178                {
179                    continue; // Process contingency order later once parent triggered
180                }
181            }
182
183            let position_id = self
184                .cache
185                .borrow()
186                .position_id(&order.client_order_id())
187                .copied();
188            let client_id = self
189                .cache
190                .borrow()
191                .client_id(&order.client_order_id())
192                .copied();
193
194            let command = SubmitOrder::new(
195                order.trader_id(),
196                client_id.unwrap(),
197                order.strategy_id(),
198                order.instrument_id(),
199                order.client_order_id(),
200                order.venue_order_id().unwrap(),
201                order.clone(),
202                order.exec_algorithm_id(),
203                position_id,
204                UUID4::new(),
205                self.clock.borrow().timestamp_ns(),
206            )?;
207
208            self.handle_submit_order(command);
209        }
210
211        Ok(())
212    }
213
214    pub fn on_event(&mut self, event: OrderEventAny) {
215        log::info!("{RECV}{EVT} {event}");
216
217        self.manager.handle_event(event.clone());
218
219        if let Some(order) = self.cache.borrow().order(&event.client_order_id()) {
220            if order.is_closed() {
221                if let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id()) {
222                    if let Err(e) =
223                        matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
224                    {
225                        log::error!("Error deleting order: {}", e);
226                    }
227                }
228            }
229        }
230        // else: Order not in cache yet
231    }
232
233    pub const fn on_stop(&self) {}
234
235    pub fn on_reset(&mut self) {
236        self.manager.reset();
237        self.matching_cores.clear();
238    }
239
240    pub const fn on_dispose(&self) {}
241
242    pub fn execute(&mut self, command: TradingCommand) {
243        log::info!("{RECV}{CMD} {command}");
244
245        match command {
246            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
247            TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
248            TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
249            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
250            TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
251            _ => log::error!("Cannot handle command: unrecognized {:?}", command),
252        }
253    }
254
255    fn create_matching_core(
256        &mut self,
257        instrument_id: InstrumentId,
258        price_increment: Price,
259    ) -> OrderMatchingCore {
260        let matching_core =
261            OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
262        self.matching_cores
263            .insert(instrument_id, matching_core.clone());
264        log::info!("Creating matching core for {:?}", instrument_id);
265        matching_core
266    }
267
268    pub fn handle_submit_order(&mut self, command: SubmitOrder) {
269        let mut order = command.order.clone();
270        let emulation_trigger = order.emulation_trigger();
271
272        assert_ne!(
273            emulation_trigger,
274            Some(TriggerType::NoTrigger),
275            "command.order.emulation_trigger must not be TriggerType::NoTrigger"
276        );
277        assert!(
278            self.manager
279                .get_submit_order_commands()
280                .contains_key(&order.client_order_id()),
281            "command.order.client_order_id must be in submit_order_commands"
282        );
283
284        if !matches!(
285            emulation_trigger,
286            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
287        ) {
288            log::error!(
289                "Cannot emulate order: `TriggerType` {:?} not supported",
290                emulation_trigger
291            );
292            self.manager.cancel_order(&order);
293            return;
294        }
295
296        self.check_monitoring(command.strategy_id, command.position_id);
297
298        // Get or create matching core
299        let trigger_instrument_id = order
300            .trigger_instrument_id()
301            .unwrap_or_else(|| order.instrument_id());
302
303        let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
304
305        let mut matching_core = if let Some(core) = matching_core {
306            core
307        } else {
308            // Handle synthetic instruments
309            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
310                let synthetic = self
311                    .cache
312                    .borrow()
313                    .synthetic(&trigger_instrument_id)
314                    .cloned();
315                if let Some(synthetic) = synthetic {
316                    (synthetic.id, synthetic.price_increment)
317                } else {
318                    log::error!(
319                        "Cannot emulate order: no synthetic instrument {} for trigger",
320                        trigger_instrument_id
321                    );
322                    self.manager.cancel_order(&order);
323                    return;
324                }
325            } else {
326                let instrument = self
327                    .cache
328                    .borrow()
329                    .instrument(&trigger_instrument_id)
330                    .cloned();
331                if let Some(instrument) = instrument {
332                    (instrument.id(), instrument.price_increment())
333                } else {
334                    log::error!(
335                        "Cannot emulate order: no instrument {} for trigger",
336                        trigger_instrument_id
337                    );
338                    self.manager.cancel_order(&order);
339                    return;
340                }
341            };
342
343            self.create_matching_core(instrument_id, price_increment)
344        };
345
346        // Update trailing stop
347        if matches!(
348            order.order_type(),
349            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
350        ) {
351            self.update_trailing_stop_order(&mut order);
352            if order.trigger_price().is_none() {
353                log::error!(
354                    "Cannot handle trailing stop order with no trigger_price and no market updates"
355                );
356
357                self.manager.cancel_order(&order);
358                return;
359            }
360        }
361
362        // Cache command
363        self.manager.cache_submit_order_command(command);
364
365        // Check if immediately marketable
366        matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
367
368        // Handle data subscriptions
369        match emulation_trigger.unwrap() {
370            TriggerType::Default | TriggerType::BidAsk => {
371                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
372                    if !trigger_instrument_id.is_synthetic() {
373                        // TODO: Impl Actor Trait
374                        // self.subscribe_order_book_deltas(&trigger_instrument_id);
375                    }
376                    // TODO: Impl Actor Trait
377                    // self.subscribe_quote_ticks(&trigger_instrument_id)?;
378                    self.subscribed_quotes.insert(trigger_instrument_id);
379                }
380            }
381            TriggerType::LastPrice => {
382                if !self.subscribed_trades.contains(&trigger_instrument_id) {
383                    // TODO: Impl Actor Trait
384                    // self.subscribe_trade_ticks(&trigger_instrument_id)?;
385                    self.subscribed_trades.insert(trigger_instrument_id);
386                }
387            }
388            _ => {
389                log::error!("Invalid TriggerType: {:?}", emulation_trigger);
390                return;
391            }
392        }
393
394        // Check if order was already released
395        if !self
396            .manager
397            .get_submit_order_commands()
398            .contains_key(&order.client_order_id())
399        {
400            return; // Already released
401        }
402
403        // Hold in matching core
404        if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
405            log::error!("Cannot add order: {:?}", e);
406            return;
407        }
408
409        // Generate emulated event if needed
410        if order.status() == OrderStatus::Initialized {
411            let event = OrderEmulated::new(
412                order.trader_id(),
413                order.strategy_id(),
414                order.instrument_id(),
415                order.client_order_id(),
416                UUID4::new(),
417                self.clock.borrow().timestamp_ns(),
418                self.clock.borrow().timestamp_ns(),
419            );
420
421            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
422                log::error!("Cannot apply order event: {:?}", e);
423                return;
424            }
425
426            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
427                log::error!("Cannot update order: {:?}", e);
428                return;
429            }
430
431            self.manager.send_risk_event(OrderEventAny::Emulated(event));
432
433            self.msgbus.borrow().publish(
434                &format!("events.order.{}", order.strategy_id()).into(),
435                &OrderEventAny::Emulated(event),
436            );
437        }
438
439        // Since we are cloning the matching core, we need to insert it back into the original hashmap
440        self.matching_cores
441            .insert(trigger_instrument_id, matching_core);
442
443        log::info!("Emulating {}", order);
444    }
445
446    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
447        self.check_monitoring(command.strategy_id, command.position_id);
448
449        for order in &command.order_list.orders {
450            if let Some(parent_order_id) = order.parent_order_id() {
451                let cache = self.cache.borrow();
452                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
453                    parent_order
454                } else {
455                    log::error!("Parent order for {} not found", order.client_order_id());
456                    continue;
457                };
458
459                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
460                    continue; // Process contingency order later once parent triggered
461                }
462            }
463
464            if let Err(e) = self.manager.create_new_submit_order(
465                order,
466                command.position_id,
467                Some(command.client_id),
468            ) {
469                log::error!("Error creating new submit order: {}", e);
470            }
471        }
472    }
473
474    fn handle_modify_order(&mut self, command: ModifyOrder) {
475        if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
476            let price = match command.price {
477                Some(price) => Some(price),
478                None => order.price(),
479            };
480
481            let trigger_price = match command.trigger_price {
482                Some(trigger_price) => Some(trigger_price),
483                None => order.trigger_price(),
484            };
485
486            // Generate event
487            let ts_now = self.clock.borrow().timestamp_ns();
488            let event = OrderUpdated::new(
489                order.trader_id(),
490                order.strategy_id(),
491                order.instrument_id(),
492                order.client_order_id(),
493                command.quantity.unwrap_or(order.quantity()),
494                UUID4::new(),
495                ts_now,
496                ts_now,
497                false,
498                order.venue_order_id(),
499                order.account_id(),
500                price,
501                trigger_price,
502            );
503
504            self.manager.send_exec_event(OrderEventAny::Updated(event));
505
506            let trigger_instrument_id = order
507                .trigger_instrument_id()
508                .unwrap_or_else(|| order.instrument_id());
509
510            if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
511                matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
512            } else {
513                log::error!(
514                    "Cannot handle `ModifyOrder`: no matching core for trigger instrument {}",
515                    trigger_instrument_id
516                );
517            }
518        } else {
519            log::error!("Cannot modify order: {} not found", command.client_order_id);
520        }
521    }
522
523    pub fn handle_cancel_order(&mut self, command: CancelOrder) {
524        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
525            order.clone()
526        } else {
527            log::error!("Cannot cancel order: {} not found", command.client_order_id);
528            return;
529        };
530
531        let trigger_instrument_id = order
532            .trigger_instrument_id()
533            .unwrap_or_else(|| order.instrument_id());
534
535        let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
536            core
537        } else {
538            self.manager.cancel_order(&order);
539            return;
540        };
541
542        if !matching_core.order_exists(order.client_order_id())
543            && order.is_open()
544            && !order.is_pending_cancel()
545        {
546            // Order not held in the emulator
547            self.manager
548                .send_exec_command(TradingCommand::CancelOrder(command));
549        } else {
550            self.manager.cancel_order(&order);
551        }
552    }
553
554    fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
555        let matching_core = match self.matching_cores.get(&command.instrument_id) {
556            Some(core) => core,
557            None => return, // No orders to cancel
558        };
559
560        let orders_to_cancel = match command.order_side {
561            OrderSide::NoOrderSide => {
562                // Get both bid and ask orders
563                let mut all_orders = Vec::new();
564                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
565                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
566                all_orders
567            }
568            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
569            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
570        };
571
572        // Process all orders in a single iteration
573        for order in orders_to_cancel {
574            self.manager.cancel_order(&OrderAny::from(order));
575        }
576    }
577
578    // --------------------------------------------------------------------------------------------
579
580    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
581        log::info!(
582            "Updating order {} quantity to {}",
583            order.client_order_id(),
584            new_quantity
585        );
586
587        // Generate event
588        let ts_now = self.clock.borrow().timestamp_ns();
589        let event = OrderUpdated::new(
590            order.trader_id(),
591            order.strategy_id(),
592            order.instrument_id(),
593            order.client_order_id(),
594            new_quantity,
595            UUID4::new(),
596            ts_now,
597            ts_now,
598            false,
599            None,
600            order.account_id(),
601            None,
602            None,
603        );
604
605        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
606            log::error!("Cannot apply order event: {:?}", e);
607            return;
608        }
609        if let Err(e) = self.cache.borrow_mut().update_order(order) {
610            log::error!("Cannot update order: {:?}", e);
611            return;
612        }
613
614        self.manager.send_risk_event(OrderEventAny::Updated(event));
615    }
616
617    // -----------------------------------------------------------------------------------------------
618
619    pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
620        log::debug!("Processing OrderBookDeltas:{}", deltas);
621
622        let instrument_id = &deltas.instrument_id;
623        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
624            if let Some(book) = self.cache.borrow().order_book(instrument_id) {
625                let best_bid = book.best_bid_price();
626                let best_ask = book.best_ask_price();
627
628                if let Some(best_bid) = best_bid {
629                    matching_core.set_bid_raw(best_bid);
630                }
631
632                if let Some(best_ask) = best_ask {
633                    matching_core.set_ask_raw(best_ask);
634                }
635            } else {
636                log::error!(
637                    "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
638                    deltas.instrument_id
639                );
640            }
641
642            self.iterate_orders(instrument_id);
643        } else {
644            log::error!(
645                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
646                deltas.instrument_id
647            );
648        }
649    }
650
651    pub fn on_quote_tick(&mut self, tick: QuoteTick) {
652        log::debug!("Processing QuoteTick:{}", tick);
653
654        let instrument_id = &tick.instrument_id;
655        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
656            matching_core.set_bid_raw(tick.bid_price);
657            matching_core.set_ask_raw(tick.ask_price);
658
659            self.iterate_orders(instrument_id);
660        } else {
661            log::error!(
662                "Cannot handle `QuoteTick`: no matching core for instrument {}",
663                tick.instrument_id
664            );
665        }
666    }
667
668    pub fn on_trade_tick(&mut self, tick: TradeTick) {
669        log::debug!("Processing TradeTick:{}", tick);
670
671        let instrument_id = &tick.instrument_id;
672        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
673            matching_core.set_last_raw(tick.price);
674            if !self.subscribed_quotes.contains(instrument_id) {
675                matching_core.set_bid_raw(tick.price);
676                matching_core.set_ask_raw(tick.price);
677            }
678
679            self.iterate_orders(instrument_id);
680        } else {
681            log::error!(
682                "Cannot handle `TradeTick`: no matching core for instrument {}",
683                tick.instrument_id
684            );
685        }
686    }
687
688    fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
689        let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
690            matching_core.iterate();
691
692            matching_core.get_orders()
693        } else {
694            log::error!(
695                "Cannot iterate orders: no matching core for instrument {}",
696                instrument_id
697            );
698            return;
699        };
700
701        for order in orders {
702            if order.is_closed() {
703                continue;
704            }
705
706            let mut order: OrderAny = order.clone().into();
707            if matches!(
708                order.order_type(),
709                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
710            ) {
711                self.update_trailing_stop_order(&mut order);
712            }
713        }
714    }
715
716    pub fn cancel_order(&mut self, order: &OrderAny) {
717        log::info!("Canceling order {}", order.client_order_id());
718
719        let mut order = order.clone();
720        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
721
722        let trigger_instrument_id = order
723            .trigger_instrument_id()
724            .unwrap_or(order.instrument_id());
725
726        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
727            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
728                log::error!("Cannot delete order: {:?}", e);
729            }
730        }
731
732        self.cache
733            .borrow_mut()
734            .update_order_pending_cancel_local(&order);
735
736        // Generate event
737        let ts_now = self.clock.borrow().timestamp_ns();
738        let event = OrderCanceled::new(
739            order.trader_id(),
740            order.strategy_id(),
741            order.instrument_id(),
742            order.client_order_id(),
743            UUID4::new(),
744            ts_now,
745            ts_now,
746            false,
747            order.venue_order_id(),
748            order.account_id(),
749        );
750
751        self.manager.send_exec_event(OrderEventAny::Canceled(event));
752    }
753
754    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
755        if !self.subscribed_strategies.contains(&strategy_id) {
756            // Subscribe to all strategy events
757            if let Some(handler) = &self.on_event_handler {
758                self.msgbus.borrow_mut().subscribe(
759                    format!("events.order.{strategy_id}"),
760                    handler.clone(),
761                    None,
762                );
763                self.msgbus.borrow_mut().subscribe(
764                    format!("events.position.{strategy_id}"),
765                    handler.clone(),
766                    None,
767                );
768                self.subscribed_strategies.insert(strategy_id);
769                log::info!(
770                    "Subscribed to strategy {} order and position events",
771                    strategy_id
772                );
773            }
774        }
775
776        if let Some(position_id) = position_id {
777            if !self.monitored_positions.contains(&position_id) {
778                self.monitored_positions.insert(position_id);
779            }
780        }
781    }
782
783    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
784        match order.order_type() {
785            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
786                self.fill_limit_order(order);
787            }
788            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
789                self.fill_market_order(order);
790            }
791            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
792        }
793    }
794
795    pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
796        if matches!(order.order_type(), OrderType::Limit) {
797            self.fill_market_order(order);
798            return;
799        }
800
801        // Fetch command
802        let mut command = match self
803            .manager
804            .pop_submit_order_command(order.client_order_id())
805        {
806            Some(command) => command,
807            None => return, // Order already released
808        };
809
810        let trigger_instrument_id = order
811            .trigger_instrument_id()
812            .unwrap_or(order.instrument_id());
813
814        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
815            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
816                log::error!("Error deleting order: {:?}", e);
817            }
818
819            let emulation_trigger = TriggerType::NoTrigger;
820
821            // Transform order
822            let mut transformed = if let Ok(transformed) = LimitOrder::new(
823                order.trader_id(),
824                order.strategy_id(),
825                order.instrument_id(),
826                order.client_order_id(),
827                order.order_side(),
828                order.quantity(),
829                order.price().unwrap(),
830                order.time_in_force(),
831                order.expire_time(),
832                order.is_post_only(),
833                order.is_reduce_only(),
834                order.is_quote_quantity(),
835                order.display_qty(),
836                Some(emulation_trigger),
837                Some(trigger_instrument_id),
838                order.contingency_type(),
839                order.order_list_id(),
840                order.linked_order_ids(),
841                order.parent_order_id(),
842                order.exec_algorithm_id(),
843                order.exec_algorithm_params(),
844                order.exec_spawn_id(),
845                order.tags(),
846                UUID4::new(),
847                self.clock.borrow().timestamp_ns(),
848            ) {
849                transformed
850            } else {
851                log::error!("Cannot create limit order");
852                return;
853            };
854            transformed.liquidity_side = order.liquidity_side();
855
856            // TODO: fix
857            // let triggered_price = order.trigger_price();
858            // if triggered_price.is_some() {
859            //     transformed.trigger_price() = (triggered_price.unwrap());
860            // }
861
862            let original_events = order.events();
863
864            for event in original_events {
865                transformed.events.insert(0, event.clone());
866            }
867
868            if let Err(e) = self.cache.borrow_mut().add_order(
869                OrderAny::Limit(transformed.clone()),
870                command.position_id,
871                Some(command.client_id),
872                true,
873            ) {
874                log::error!("Failed to add order: {}", e);
875            }
876
877            // Replace commands order with transformed order
878            command.order = OrderAny::Limit(transformed.clone());
879
880            self.msgbus.borrow().publish(
881                &format!("events.order.{}", order.strategy_id()).into(),
882                transformed.last_event(),
883            );
884
885            // Determine triggered price
886            let released_price = match order.order_side() {
887                OrderSide::Buy => matching_core.ask,
888                OrderSide::Sell => matching_core.bid,
889                _ => panic!("invalid `OrderSide`"),
890            };
891
892            // Generate event
893            let event = OrderReleased::new(
894                order.trader_id(),
895                order.strategy_id(),
896                order.instrument_id(),
897                order.client_order_id(),
898                released_price.unwrap(),
899                UUID4::new(),
900                self.clock.borrow().timestamp_ns(),
901                self.clock.borrow().timestamp_ns(),
902            );
903
904            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
905                log::error!("Failed to apply order event: {}", e);
906            }
907
908            if let Err(e) = self
909                .cache
910                .borrow_mut()
911                .update_order(&OrderAny::Limit(transformed.clone()))
912            {
913                log::error!("Failed to update order: {}", e);
914            }
915
916            self.manager.send_risk_event(OrderEventAny::Released(event));
917
918            log::info!("Releasing order {}", order.client_order_id());
919
920            // Publish event
921            self.msgbus.borrow().publish(
922                &format!("events.order.{}", transformed.strategy_id()).into(),
923                &OrderEventAny::Released(event),
924            );
925
926            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
927                self.manager.send_algo_command(command, exec_algorithm_id);
928            } else {
929                self.manager
930                    .send_exec_command(TradingCommand::SubmitOrder(command));
931            }
932        } else {
933            log::error!(
934                "Cannot fill limit order: no matching core for instrument {}",
935                trigger_instrument_id
936            );
937        }
938    }
939
940    pub fn fill_market_order(&mut self, order: &mut OrderAny) {
941        // Fetch command
942        let mut command = match self
943            .manager
944            .pop_submit_order_command(order.client_order_id())
945        {
946            Some(command) => command,
947            None => panic!("invalid operation `_fill_market_order` with no command"),
948        };
949
950        let trigger_instrument_id = order
951            .trigger_instrument_id()
952            .unwrap_or(order.instrument_id());
953
954        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
955            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
956                log::error!("Cannot delete order: {:?}", e);
957            }
958
959            order.set_emulation_trigger(Some(TriggerType::NoTrigger));
960
961            // Transform order
962            let mut transformed = MarketOrder::new(
963                order.trader_id(),
964                order.strategy_id(),
965                order.instrument_id(),
966                order.client_order_id(),
967                order.order_side(),
968                order.quantity(),
969                order.time_in_force(),
970                UUID4::new(),
971                self.clock.borrow().timestamp_ns(),
972                order.is_reduce_only(),
973                order.is_quote_quantity(),
974                order.contingency_type(),
975                order.order_list_id(),
976                order.linked_order_ids(),
977                order.parent_order_id(),
978                order.exec_algorithm_id(),
979                order.exec_algorithm_params(),
980                order.exec_spawn_id(),
981                order.tags(),
982            );
983
984            let original_events = order.events();
985
986            for event in original_events {
987                transformed.events.insert(0, event.clone());
988            }
989
990            if let Err(e) = self.cache.borrow_mut().add_order(
991                OrderAny::Market(transformed.clone()),
992                command.position_id,
993                Some(command.client_id),
994                true,
995            ) {
996                log::error!("Failed to add order: {}", e);
997            }
998
999            // Replace commands order with transformed order
1000            command.order = OrderAny::Market(transformed.clone());
1001
1002            self.msgbus.borrow().publish(
1003                &format!("events.order.{}", order.strategy_id()).into(),
1004                transformed.last_event(),
1005            );
1006
1007            // Determine triggered price
1008            // TODO: fix unwraps
1009            let released_price = match order.order_side() {
1010                OrderSide::Buy => matching_core.ask,
1011                OrderSide::Sell => matching_core.bid,
1012                _ => panic!("invalid `OrderSide`"),
1013            };
1014
1015            // Generate event
1016            let ts_now = self.clock.borrow().timestamp_ns();
1017            let event = OrderReleased::new(
1018                order.trader_id(),
1019                order.strategy_id(),
1020                order.instrument_id(),
1021                order.client_order_id(),
1022                released_price.unwrap(),
1023                UUID4::new(),
1024                ts_now,
1025                ts_now,
1026            );
1027
1028            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1029                log::error!("Failed to apply order event: {}", e);
1030            }
1031
1032            if let Err(e) = self
1033                .cache
1034                .borrow_mut()
1035                .update_order(&OrderAny::Market(transformed))
1036            {
1037                log::error!("Failed to update order: {}", e);
1038            }
1039            self.manager.send_risk_event(OrderEventAny::Released(event));
1040
1041            log::info!("Releasing order {}", order.client_order_id());
1042
1043            // Publish event
1044            self.msgbus.borrow().publish(
1045                &format!("events.order.{}", order.strategy_id()).into(),
1046                &OrderEventAny::Released(event),
1047            );
1048
1049            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1050                self.manager.send_algo_command(command, exec_algorithm_id);
1051            } else {
1052                self.manager
1053                    .send_exec_command(TradingCommand::SubmitOrder(command));
1054            }
1055        } else {
1056            log::error!(
1057                "Cannot fill limit order: no matching core for instrument {}",
1058                trigger_instrument_id
1059            );
1060        }
1061    }
1062
1063    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1064        if let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) {
1065            let mut bid = None;
1066            let mut ask = None;
1067            let mut last = None;
1068
1069            if matching_core.is_bid_initialized {
1070                bid = matching_core.bid;
1071            }
1072            if matching_core.is_ask_initialized {
1073                ask = matching_core.ask;
1074            }
1075            if matching_core.is_last_initialized {
1076                last = matching_core.last;
1077            }
1078
1079            let quote_tick = self
1080                .cache
1081                .borrow()
1082                .quote(&matching_core.instrument_id)
1083                .copied();
1084            let trade_tick = self
1085                .cache
1086                .borrow()
1087                .trade(&matching_core.instrument_id)
1088                .copied();
1089
1090            if bid.is_none() && quote_tick.is_some() {
1091                bid = Some(quote_tick.unwrap().bid_price);
1092            }
1093            if ask.is_none() && quote_tick.is_some() {
1094                ask = Some(quote_tick.unwrap().ask_price);
1095            }
1096            if last.is_none() && trade_tick.is_some() {
1097                last = Some(trade_tick.unwrap().price);
1098            }
1099
1100            let (new_trigger_price, new_price) = if let Ok((new_trigger_price, new_price)) =
1101                trailing_stop_calculate(matching_core.price_increment, order, bid, ask, last)
1102            {
1103                (new_trigger_price, new_price)
1104            } else {
1105                log::warn!("Cannot calculate trailing stop order");
1106                return;
1107            };
1108
1109            let (new_trigger_price, new_price) = match (new_trigger_price, new_price) {
1110                (None, None) => return, // No updates
1111                _ => (new_trigger_price, new_price),
1112            };
1113
1114            // Generate event
1115            let ts_now = self.clock.borrow().timestamp_ns();
1116            let event = OrderUpdated::new(
1117                order.trader_id(),
1118                order.strategy_id(),
1119                order.instrument_id(),
1120                order.client_order_id(),
1121                order.quantity(),
1122                UUID4::new(),
1123                ts_now,
1124                ts_now,
1125                false,
1126                order.venue_order_id(),
1127                order.account_id(),
1128                new_price,
1129                new_trigger_price,
1130            );
1131
1132            if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
1133                log::error!("Failed to apply order event: {}", e);
1134            }
1135            if let Err(e) = self.cache.borrow_mut().update_order(order) {
1136                log::error!("Failed to update order: {}", e);
1137            }
1138
1139            self.manager.send_risk_event(OrderEventAny::Updated(event));
1140        } else {
1141            log::error!(
1142                "Cannot update trailing stop order: no matching core for instrument {}",
1143                order.instrument_id()
1144            );
1145        }
1146    }
1147}