nautilus_execution/order_emulator/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    collections::{HashMap, HashSet},
19    rc::Rc,
20};
21
22use nautilus_common::{
23    cache::Cache,
24    clock::Clock,
25    logging::{CMD, EVT, RECV},
26    msgbus::{
27        handler::ShareableMessageHandler,
28        {self},
29    },
30};
31use nautilus_core::uuid::UUID4;
32use nautilus_model::{
33    data::{OrderBookDeltas, QuoteTick, TradeTick},
34    enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
35    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
36    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
37    instruments::Instrument,
38    orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
39    types::{Price, Quantity},
40};
41
42use crate::{
43    matching_core::OrderMatchingCore,
44    messages::{
45        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
46        cancel::CancelOrderHandlerAny, modify::ModifyOrderHandlerAny,
47        submit::SubmitOrderHandlerAny,
48    },
49    order_manager::manager::OrderManager,
50    trailing::trailing_stop_calculate,
51};
52
53pub struct OrderEmulator {
54    clock: Rc<RefCell<dyn Clock>>,
55    cache: Rc<RefCell<Cache>>,
56    manager: OrderManager,
57    matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
58    subscribed_quotes: HashSet<InstrumentId>,
59    subscribed_trades: HashSet<InstrumentId>,
60    subscribed_strategies: HashSet<StrategyId>,
61    monitored_positions: HashSet<PositionId>,
62    on_event_handler: Option<ShareableMessageHandler>,
63}
64
65impl OrderEmulator {
66    pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
67        // TODO: Impl Actor Trait
68        // self.register_base(portfolio, msgbus, cache, clock);
69
70        let active_local = true;
71        let manager =
72            OrderManager::new(clock.clone(), cache.clone(), active_local, None, None, None);
73
74        Self {
75            clock,
76            cache,
77            manager,
78            matching_cores: HashMap::new(),
79            subscribed_quotes: HashSet::new(),
80            subscribed_trades: HashSet::new(),
81            subscribed_strategies: HashSet::new(),
82            monitored_positions: HashSet::new(),
83            on_event_handler: None,
84        }
85    }
86
87    pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
88        self.on_event_handler = Some(handler);
89    }
90
91    pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
92        self.manager.set_submit_order_handler(handler);
93    }
94
95    pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
96        self.manager.set_cancel_order_handler(handler);
97    }
98
99    pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
100        self.manager.set_modify_order_handler(handler);
101    }
102
103    #[must_use]
104    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
105        let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
106        quotes.sort();
107        quotes
108    }
109
110    #[must_use]
111    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
112        let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
113        trades.sort();
114        trades
115    }
116
117    #[must_use]
118    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
119        self.manager.get_submit_order_commands()
120    }
121
122    #[must_use]
123    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
124        self.matching_cores.get(instrument_id).cloned()
125    }
126
127    pub fn on_start(&mut self) -> anyhow::Result<()> {
128        let emulated_orders: Vec<OrderAny> = self
129            .cache
130            .borrow()
131            .orders_emulated(None, None, None, None)
132            .into_iter()
133            .cloned()
134            .collect();
135
136        if emulated_orders.is_empty() {
137            log::error!("No emulated orders to reactivate");
138            return Ok(());
139        }
140
141        for order in emulated_orders {
142            if !matches!(
143                order.status(),
144                OrderStatus::Initialized | OrderStatus::Emulated
145            ) {
146                continue; // No longer emulated
147            }
148
149            if let Some(parent_order_id) = &order.parent_order_id() {
150                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
151                    order.clone()
152                } else {
153                    log::error!("Cannot handle order: parent {parent_order_id} not found");
154                    continue;
155                };
156
157                let is_position_closed = parent_order
158                    .position_id()
159                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
160                if parent_order.is_closed() && is_position_closed {
161                    self.manager.cancel_order(&order);
162                    continue; // Parent already closed
163                }
164
165                if parent_order.contingency_type() == Some(ContingencyType::Oto)
166                    && (parent_order.is_active_local()
167                        || parent_order.filled_qty() == Quantity::zero(0))
168                {
169                    continue; // Process contingency order later once parent triggered
170                }
171            }
172
173            let position_id = self
174                .cache
175                .borrow()
176                .position_id(&order.client_order_id())
177                .copied();
178            let client_id = self
179                .cache
180                .borrow()
181                .client_id(&order.client_order_id())
182                .copied();
183
184            let command = SubmitOrder::new(
185                order.trader_id(),
186                client_id.unwrap(),
187                order.strategy_id(),
188                order.instrument_id(),
189                order.client_order_id(),
190                order.venue_order_id().unwrap(),
191                order.clone(),
192                order.exec_algorithm_id(),
193                position_id,
194                UUID4::new(),
195                self.clock.borrow().timestamp_ns(),
196            )?;
197
198            self.handle_submit_order(command);
199        }
200
201        Ok(())
202    }
203
204    pub fn on_event(&mut self, event: OrderEventAny) {
205        log::info!("{RECV}{EVT} {event}");
206
207        self.manager.handle_event(event.clone());
208
209        if let Some(order) = self.cache.borrow().order(&event.client_order_id()) {
210            if order.is_closed() {
211                if let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id()) {
212                    if let Err(e) =
213                        matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
214                    {
215                        log::error!("Error deleting order: {e}");
216                    }
217                }
218            }
219        }
220        // else: Order not in cache yet
221    }
222
223    pub const fn on_stop(&self) {}
224
225    pub fn on_reset(&mut self) {
226        self.manager.reset();
227        self.matching_cores.clear();
228    }
229
230    pub const fn on_dispose(&self) {}
231
232    pub fn execute(&mut self, command: TradingCommand) {
233        log::info!("{RECV}{CMD} {command}");
234
235        match command {
236            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
237            TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
238            TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
239            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
240            TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
241            _ => log::error!("Cannot handle command: unrecognized {command:?}"),
242        }
243    }
244
245    fn create_matching_core(
246        &mut self,
247        instrument_id: InstrumentId,
248        price_increment: Price,
249    ) -> OrderMatchingCore {
250        let matching_core =
251            OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
252        self.matching_cores
253            .insert(instrument_id, matching_core.clone());
254        log::info!("Creating matching core for {instrument_id:?}");
255        matching_core
256    }
257
258    pub fn handle_submit_order(&mut self, command: SubmitOrder) {
259        let mut order = command.order.clone();
260        let emulation_trigger = order.emulation_trigger();
261
262        assert_ne!(
263            emulation_trigger,
264            Some(TriggerType::NoTrigger),
265            "command.order.emulation_trigger must not be TriggerType::NoTrigger"
266        );
267        assert!(
268            self.manager
269                .get_submit_order_commands()
270                .contains_key(&order.client_order_id()),
271            "command.order.client_order_id must be in submit_order_commands"
272        );
273
274        if !matches!(
275            emulation_trigger,
276            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
277        ) {
278            log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
279            self.manager.cancel_order(&order);
280            return;
281        }
282
283        self.check_monitoring(command.strategy_id, command.position_id);
284
285        // Get or create matching core
286        let trigger_instrument_id = order
287            .trigger_instrument_id()
288            .unwrap_or_else(|| order.instrument_id());
289
290        let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
291
292        let mut matching_core = if let Some(core) = matching_core {
293            core
294        } else {
295            // Handle synthetic instruments
296            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
297                let synthetic = self
298                    .cache
299                    .borrow()
300                    .synthetic(&trigger_instrument_id)
301                    .cloned();
302                if let Some(synthetic) = synthetic {
303                    (synthetic.id, synthetic.price_increment)
304                } else {
305                    log::error!(
306                        "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
307                    );
308                    self.manager.cancel_order(&order);
309                    return;
310                }
311            } else {
312                let instrument = self
313                    .cache
314                    .borrow()
315                    .instrument(&trigger_instrument_id)
316                    .cloned();
317                if let Some(instrument) = instrument {
318                    (instrument.id(), instrument.price_increment())
319                } else {
320                    log::error!(
321                        "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
322                    );
323                    self.manager.cancel_order(&order);
324                    return;
325                }
326            };
327
328            self.create_matching_core(instrument_id, price_increment)
329        };
330
331        // Update trailing stop
332        if matches!(
333            order.order_type(),
334            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
335        ) {
336            self.update_trailing_stop_order(&mut order);
337            if order.trigger_price().is_none() {
338                log::error!(
339                    "Cannot handle trailing stop order with no trigger_price and no market updates"
340                );
341
342                self.manager.cancel_order(&order);
343                return;
344            }
345        }
346
347        // Cache command
348        self.manager.cache_submit_order_command(command);
349
350        // Check if immediately marketable
351        matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
352
353        // Handle data subscriptions
354        match emulation_trigger.unwrap() {
355            TriggerType::Default | TriggerType::BidAsk => {
356                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
357                    if !trigger_instrument_id.is_synthetic() {
358                        // TODO: Impl Actor Trait
359                        // self.subscribe_order_book_deltas(&trigger_instrument_id);
360                    }
361                    // TODO: Impl Actor Trait
362                    // self.subscribe_quote_ticks(&trigger_instrument_id)?;
363                    self.subscribed_quotes.insert(trigger_instrument_id);
364                }
365            }
366            TriggerType::LastPrice => {
367                if !self.subscribed_trades.contains(&trigger_instrument_id) {
368                    // TODO: Impl Actor Trait
369                    // self.subscribe_trade_ticks(&trigger_instrument_id)?;
370                    self.subscribed_trades.insert(trigger_instrument_id);
371                }
372            }
373            _ => {
374                log::error!("Invalid TriggerType: {emulation_trigger:?}");
375                return;
376            }
377        }
378
379        // Check if order was already released
380        if !self
381            .manager
382            .get_submit_order_commands()
383            .contains_key(&order.client_order_id())
384        {
385            return; // Already released
386        }
387
388        // Hold in matching core
389        if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
390            log::error!("Cannot add order: {e:?}");
391            return;
392        }
393
394        // Generate emulated event if needed
395        if order.status() == OrderStatus::Initialized {
396            let event = OrderEmulated::new(
397                order.trader_id(),
398                order.strategy_id(),
399                order.instrument_id(),
400                order.client_order_id(),
401                UUID4::new(),
402                self.clock.borrow().timestamp_ns(),
403                self.clock.borrow().timestamp_ns(),
404            );
405
406            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
407                log::error!("Cannot apply order event: {e:?}");
408                return;
409            }
410
411            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
412                log::error!("Cannot update order: {e:?}");
413                return;
414            }
415
416            self.manager.send_risk_event(OrderEventAny::Emulated(event));
417
418            msgbus::publish(
419                &format!("events.order.{}", order.strategy_id()).into(),
420                &OrderEventAny::Emulated(event),
421            );
422        }
423
424        // Since we are cloning the matching core, we need to insert it back into the original hashmap
425        self.matching_cores
426            .insert(trigger_instrument_id, matching_core);
427
428        log::info!("Emulating {order}");
429    }
430
431    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
432        self.check_monitoring(command.strategy_id, command.position_id);
433
434        for order in &command.order_list.orders {
435            if let Some(parent_order_id) = order.parent_order_id() {
436                let cache = self.cache.borrow();
437                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
438                    parent_order
439                } else {
440                    log::error!("Parent order for {} not found", order.client_order_id());
441                    continue;
442                };
443
444                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
445                    continue; // Process contingency order later once parent triggered
446                }
447            }
448
449            if let Err(e) = self.manager.create_new_submit_order(
450                order,
451                command.position_id,
452                Some(command.client_id),
453            ) {
454                log::error!("Error creating new submit order: {e}");
455            }
456        }
457    }
458
459    fn handle_modify_order(&mut self, command: ModifyOrder) {
460        if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
461            let price = match command.price {
462                Some(price) => Some(price),
463                None => order.price(),
464            };
465
466            let trigger_price = match command.trigger_price {
467                Some(trigger_price) => Some(trigger_price),
468                None => order.trigger_price(),
469            };
470
471            // Generate event
472            let ts_now = self.clock.borrow().timestamp_ns();
473            let event = OrderUpdated::new(
474                order.trader_id(),
475                order.strategy_id(),
476                order.instrument_id(),
477                order.client_order_id(),
478                command.quantity.unwrap_or(order.quantity()),
479                UUID4::new(),
480                ts_now,
481                ts_now,
482                false,
483                order.venue_order_id(),
484                order.account_id(),
485                price,
486                trigger_price,
487            );
488
489            self.manager.send_exec_event(OrderEventAny::Updated(event));
490
491            let trigger_instrument_id = order
492                .trigger_instrument_id()
493                .unwrap_or_else(|| order.instrument_id());
494
495            if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
496                matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
497            } else {
498                log::error!(
499                    "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
500                );
501            }
502        } else {
503            log::error!("Cannot modify order: {} not found", command.client_order_id);
504        }
505    }
506
507    pub fn handle_cancel_order(&mut self, command: CancelOrder) {
508        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
509            order.clone()
510        } else {
511            log::error!("Cannot cancel order: {} not found", command.client_order_id);
512            return;
513        };
514
515        let trigger_instrument_id = order
516            .trigger_instrument_id()
517            .unwrap_or_else(|| order.instrument_id());
518
519        let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
520            core
521        } else {
522            self.manager.cancel_order(&order);
523            return;
524        };
525
526        if !matching_core.order_exists(order.client_order_id())
527            && order.is_open()
528            && !order.is_pending_cancel()
529        {
530            // Order not held in the emulator
531            self.manager
532                .send_exec_command(TradingCommand::CancelOrder(command));
533        } else {
534            self.manager.cancel_order(&order);
535        }
536    }
537
538    fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
539        let matching_core = match self.matching_cores.get(&command.instrument_id) {
540            Some(core) => core,
541            None => return, // No orders to cancel
542        };
543
544        let orders_to_cancel = match command.order_side {
545            OrderSide::NoOrderSide => {
546                // Get both bid and ask orders
547                let mut all_orders = Vec::new();
548                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
549                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
550                all_orders
551            }
552            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
553            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
554        };
555
556        // Process all orders in a single iteration
557        for order in orders_to_cancel {
558            self.manager.cancel_order(&OrderAny::from(order));
559        }
560    }
561
562    // --------------------------------------------------------------------------------------------
563
564    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
565        log::info!(
566            "Updating order {} quantity to {new_quantity}",
567            order.client_order_id(),
568        );
569
570        // Generate event
571        let ts_now = self.clock.borrow().timestamp_ns();
572        let event = OrderUpdated::new(
573            order.trader_id(),
574            order.strategy_id(),
575            order.instrument_id(),
576            order.client_order_id(),
577            new_quantity,
578            UUID4::new(),
579            ts_now,
580            ts_now,
581            false,
582            None,
583            order.account_id(),
584            None,
585            None,
586        );
587
588        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
589            log::error!("Cannot apply order event: {e:?}");
590            return;
591        }
592        if let Err(e) = self.cache.borrow_mut().update_order(order) {
593            log::error!("Cannot update order: {e:?}");
594            return;
595        }
596
597        self.manager.send_risk_event(OrderEventAny::Updated(event));
598    }
599
600    // -----------------------------------------------------------------------------------------------
601
602    pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
603        log::debug!("Processing {deltas:?}");
604
605        let instrument_id = &deltas.instrument_id;
606        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
607            if let Some(book) = self.cache.borrow().order_book(instrument_id) {
608                let best_bid = book.best_bid_price();
609                let best_ask = book.best_ask_price();
610
611                if let Some(best_bid) = best_bid {
612                    matching_core.set_bid_raw(best_bid);
613                }
614
615                if let Some(best_ask) = best_ask {
616                    matching_core.set_ask_raw(best_ask);
617                }
618            } else {
619                log::error!(
620                    "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
621                    deltas.instrument_id
622                );
623            }
624
625            self.iterate_orders(instrument_id);
626        } else {
627            log::error!(
628                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
629                deltas.instrument_id
630            );
631        }
632    }
633
634    pub fn on_quote_tick(&mut self, quote: QuoteTick) {
635        log::debug!("Processing {quote}:?");
636
637        let instrument_id = &quote.instrument_id;
638        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
639            matching_core.set_bid_raw(quote.bid_price);
640            matching_core.set_ask_raw(quote.ask_price);
641
642            self.iterate_orders(instrument_id);
643        } else {
644            log::error!(
645                "Cannot handle `QuoteTick`: no matching core for instrument {}",
646                quote.instrument_id
647            );
648        }
649    }
650
651    pub fn on_trade_tick(&mut self, trade: TradeTick) {
652        log::debug!("Processing {trade:?}");
653
654        let instrument_id = &trade.instrument_id;
655        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
656            matching_core.set_last_raw(trade.price);
657            if !self.subscribed_quotes.contains(instrument_id) {
658                matching_core.set_bid_raw(trade.price);
659                matching_core.set_ask_raw(trade.price);
660            }
661
662            self.iterate_orders(instrument_id);
663        } else {
664            log::error!(
665                "Cannot handle `TradeTick`: no matching core for instrument {}",
666                trade.instrument_id
667            );
668        }
669    }
670
671    fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
672        let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
673            matching_core.iterate();
674
675            matching_core.get_orders()
676        } else {
677            log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
678            return;
679        };
680
681        for order in orders {
682            if order.is_closed() {
683                continue;
684            }
685
686            let mut order: OrderAny = order.clone().into();
687            if matches!(
688                order.order_type(),
689                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
690            ) {
691                self.update_trailing_stop_order(&mut order);
692            }
693        }
694    }
695
696    pub fn cancel_order(&mut self, order: &OrderAny) {
697        log::info!("Canceling order {}", order.client_order_id());
698
699        let mut order = order.clone();
700        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
701
702        let trigger_instrument_id = order
703            .trigger_instrument_id()
704            .unwrap_or(order.instrument_id());
705
706        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
707            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
708                log::error!("Cannot delete order: {e:?}");
709            }
710        }
711
712        self.cache
713            .borrow_mut()
714            .update_order_pending_cancel_local(&order);
715
716        // Generate event
717        let ts_now = self.clock.borrow().timestamp_ns();
718        let event = OrderCanceled::new(
719            order.trader_id(),
720            order.strategy_id(),
721            order.instrument_id(),
722            order.client_order_id(),
723            UUID4::new(),
724            ts_now,
725            ts_now,
726            false,
727            order.venue_order_id(),
728            order.account_id(),
729        );
730
731        self.manager.send_exec_event(OrderEventAny::Canceled(event));
732    }
733
734    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
735        if !self.subscribed_strategies.contains(&strategy_id) {
736            // Subscribe to all strategy events
737            if let Some(handler) = &self.on_event_handler {
738                msgbus::subscribe(format!("events.order.{strategy_id}"), handler.clone(), None);
739                msgbus::subscribe(
740                    format!("events.position.{strategy_id}"),
741                    handler.clone(),
742                    None,
743                );
744                self.subscribed_strategies.insert(strategy_id);
745                log::info!("Subscribed to strategy {strategy_id} order and position events");
746            }
747        }
748
749        if let Some(position_id) = position_id {
750            if !self.monitored_positions.contains(&position_id) {
751                self.monitored_positions.insert(position_id);
752            }
753        }
754    }
755
756    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
757        match order.order_type() {
758            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
759                self.fill_limit_order(order);
760            }
761            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
762                self.fill_market_order(order);
763            }
764            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
765        }
766    }
767
768    pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
769        if matches!(order.order_type(), OrderType::Limit) {
770            self.fill_market_order(order);
771            return;
772        }
773
774        // Fetch command
775        let mut command = match self
776            .manager
777            .pop_submit_order_command(order.client_order_id())
778        {
779            Some(command) => command,
780            None => return, // Order already released
781        };
782
783        let trigger_instrument_id = order
784            .trigger_instrument_id()
785            .unwrap_or(order.instrument_id());
786
787        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
788            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
789                log::error!("Error deleting order: {e:?}");
790            }
791
792            let emulation_trigger = TriggerType::NoTrigger;
793
794            // Transform order
795            let mut transformed = if let Ok(transformed) = LimitOrder::new(
796                order.trader_id(),
797                order.strategy_id(),
798                order.instrument_id(),
799                order.client_order_id(),
800                order.order_side(),
801                order.quantity(),
802                order.price().unwrap(),
803                order.time_in_force(),
804                order.expire_time(),
805                order.is_post_only(),
806                order.is_reduce_only(),
807                order.is_quote_quantity(),
808                order.display_qty(),
809                Some(emulation_trigger),
810                Some(trigger_instrument_id),
811                order.contingency_type(),
812                order.order_list_id(),
813                order.linked_order_ids().map(Vec::from),
814                order.parent_order_id(),
815                order.exec_algorithm_id(),
816                order.exec_algorithm_params().cloned(),
817                order.exec_spawn_id(),
818                order.tags().map(Vec::from),
819                UUID4::new(),
820                self.clock.borrow().timestamp_ns(),
821            ) {
822                transformed
823            } else {
824                log::error!("Cannot create limit order");
825                return;
826            };
827            transformed.liquidity_side = order.liquidity_side();
828
829            // TODO: fix
830            // let triggered_price = order.trigger_price();
831            // if triggered_price.is_some() {
832            //     transformed.trigger_price() = (triggered_price.unwrap());
833            // }
834
835            let original_events = order.events();
836
837            for event in original_events {
838                transformed.events.insert(0, event.clone());
839            }
840
841            if let Err(e) = self.cache.borrow_mut().add_order(
842                OrderAny::Limit(transformed.clone()),
843                command.position_id,
844                Some(command.client_id),
845                true,
846            ) {
847                log::error!("Failed to add order: {e}");
848            }
849
850            // Replace commands order with transformed order
851            command.order = OrderAny::Limit(transformed.clone());
852
853            msgbus::publish(
854                &format!("events.order.{}", order.strategy_id()).into(),
855                transformed.last_event(),
856            );
857
858            // Determine triggered price
859            let released_price = match order.order_side() {
860                OrderSide::Buy => matching_core.ask,
861                OrderSide::Sell => matching_core.bid,
862                _ => panic!("invalid `OrderSide`"),
863            };
864
865            // Generate event
866            let event = OrderReleased::new(
867                order.trader_id(),
868                order.strategy_id(),
869                order.instrument_id(),
870                order.client_order_id(),
871                released_price.unwrap(),
872                UUID4::new(),
873                self.clock.borrow().timestamp_ns(),
874                self.clock.borrow().timestamp_ns(),
875            );
876
877            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
878                log::error!("Failed to apply order event: {e}");
879            }
880
881            if let Err(e) = self
882                .cache
883                .borrow_mut()
884                .update_order(&OrderAny::Limit(transformed.clone()))
885            {
886                log::error!("Failed to update order: {e}");
887            }
888
889            self.manager.send_risk_event(OrderEventAny::Released(event));
890
891            log::info!("Releasing order {}", order.client_order_id());
892
893            // Publish event
894            msgbus::publish(
895                &format!("events.order.{}", transformed.strategy_id()).into(),
896                &OrderEventAny::Released(event),
897            );
898
899            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
900                self.manager.send_algo_command(command, exec_algorithm_id);
901            } else {
902                self.manager
903                    .send_exec_command(TradingCommand::SubmitOrder(command));
904            }
905        } else {
906            log::error!(
907                "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
908            );
909        }
910    }
911
912    pub fn fill_market_order(&mut self, order: &mut OrderAny) {
913        // Fetch command
914        let mut command = match self
915            .manager
916            .pop_submit_order_command(order.client_order_id())
917        {
918            Some(command) => command,
919            None => panic!("invalid operation `_fill_market_order` with no command"),
920        };
921
922        let trigger_instrument_id = order
923            .trigger_instrument_id()
924            .unwrap_or(order.instrument_id());
925
926        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
927            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
928                log::error!("Cannot delete order: {e:?}");
929            }
930
931            order.set_emulation_trigger(Some(TriggerType::NoTrigger));
932
933            // Transform order
934            let mut transformed = MarketOrder::new(
935                order.trader_id(),
936                order.strategy_id(),
937                order.instrument_id(),
938                order.client_order_id(),
939                order.order_side(),
940                order.quantity(),
941                order.time_in_force(),
942                UUID4::new(),
943                self.clock.borrow().timestamp_ns(),
944                order.is_reduce_only(),
945                order.is_quote_quantity(),
946                order.contingency_type(),
947                order.order_list_id(),
948                order.linked_order_ids().map(Vec::from),
949                order.parent_order_id(),
950                order.exec_algorithm_id(),
951                order.exec_algorithm_params().cloned(),
952                order.exec_spawn_id(),
953                order.tags().map(Vec::from),
954            );
955
956            let original_events = order.events();
957
958            for event in original_events {
959                transformed.events.insert(0, event.clone());
960            }
961
962            if let Err(e) = self.cache.borrow_mut().add_order(
963                OrderAny::Market(transformed.clone()),
964                command.position_id,
965                Some(command.client_id),
966                true,
967            ) {
968                log::error!("Failed to add order: {e}");
969            }
970
971            // Replace commands order with transformed order
972            command.order = OrderAny::Market(transformed.clone());
973
974            msgbus::publish(
975                &format!("events.order.{}", order.strategy_id()).into(),
976                transformed.last_event(),
977            );
978
979            // Determine triggered price
980            // TODO: fix unwraps
981            let released_price = match order.order_side() {
982                OrderSide::Buy => matching_core.ask,
983                OrderSide::Sell => matching_core.bid,
984                _ => panic!("invalid `OrderSide`"),
985            };
986
987            // Generate event
988            let ts_now = self.clock.borrow().timestamp_ns();
989            let event = OrderReleased::new(
990                order.trader_id(),
991                order.strategy_id(),
992                order.instrument_id(),
993                order.client_order_id(),
994                released_price.unwrap(),
995                UUID4::new(),
996                ts_now,
997                ts_now,
998            );
999
1000            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1001                log::error!("Failed to apply order event: {e}");
1002            }
1003
1004            if let Err(e) = self
1005                .cache
1006                .borrow_mut()
1007                .update_order(&OrderAny::Market(transformed))
1008            {
1009                log::error!("Failed to update order: {e}");
1010            }
1011            self.manager.send_risk_event(OrderEventAny::Released(event));
1012
1013            log::info!("Releasing order {}", order.client_order_id());
1014
1015            // Publish event
1016            msgbus::publish(
1017                &format!("events.order.{}", order.strategy_id()).into(),
1018                &OrderEventAny::Released(event),
1019            );
1020
1021            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1022                self.manager.send_algo_command(command, exec_algorithm_id);
1023            } else {
1024                self.manager
1025                    .send_exec_command(TradingCommand::SubmitOrder(command));
1026            }
1027        } else {
1028            log::error!(
1029                "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
1030            );
1031        }
1032    }
1033
1034    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1035        if let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) {
1036            let mut bid = None;
1037            let mut ask = None;
1038            let mut last = None;
1039
1040            if matching_core.is_bid_initialized {
1041                bid = matching_core.bid;
1042            }
1043            if matching_core.is_ask_initialized {
1044                ask = matching_core.ask;
1045            }
1046            if matching_core.is_last_initialized {
1047                last = matching_core.last;
1048            }
1049
1050            let quote_tick = self
1051                .cache
1052                .borrow()
1053                .quote(&matching_core.instrument_id)
1054                .copied();
1055            let trade_tick = self
1056                .cache
1057                .borrow()
1058                .trade(&matching_core.instrument_id)
1059                .copied();
1060
1061            if bid.is_none() && quote_tick.is_some() {
1062                bid = Some(quote_tick.unwrap().bid_price);
1063            }
1064            if ask.is_none() && quote_tick.is_some() {
1065                ask = Some(quote_tick.unwrap().ask_price);
1066            }
1067            if last.is_none() && trade_tick.is_some() {
1068                last = Some(trade_tick.unwrap().price);
1069            }
1070
1071            let (new_trigger_price, new_price) = if let Ok((new_trigger_price, new_price)) =
1072                trailing_stop_calculate(matching_core.price_increment, order, bid, ask, last)
1073            {
1074                (new_trigger_price, new_price)
1075            } else {
1076                log::warn!("Cannot calculate trailing stop order");
1077                return;
1078            };
1079
1080            let (new_trigger_price, new_price) = match (new_trigger_price, new_price) {
1081                (None, None) => return, // No updates
1082                _ => (new_trigger_price, new_price),
1083            };
1084
1085            // Generate event
1086            let ts_now = self.clock.borrow().timestamp_ns();
1087            let event = OrderUpdated::new(
1088                order.trader_id(),
1089                order.strategy_id(),
1090                order.instrument_id(),
1091                order.client_order_id(),
1092                order.quantity(),
1093                UUID4::new(),
1094                ts_now,
1095                ts_now,
1096                false,
1097                order.venue_order_id(),
1098                order.account_id(),
1099                new_price,
1100                new_trigger_price,
1101            );
1102
1103            if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
1104                log::error!("Failed to apply order event: {e}");
1105            }
1106            if let Err(e) = self.cache.borrow_mut().update_order(order) {
1107                log::error!("Failed to update order: {e}");
1108            }
1109
1110            self.manager.send_risk_event(OrderEventAny::Updated(event));
1111        } else {
1112            log::error!(
1113                "Cannot update trailing stop order: no matching core for instrument {}",
1114                order.instrument_id()
1115            );
1116        }
1117    }
1118}