nautilus_execution/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::RefCell,
19    collections::{HashMap, HashSet},
20    rc::Rc,
21};
22
23use anyhow::Result;
24use nautilus_common::{
25    cache::Cache,
26    clock::Clock,
27    logging::{CMD, EVT, RECV},
28    messages::data::DataResponse,
29    msgbus::{
30        handler::{MessageHandler, ShareableMessageHandler},
31        MessageBus,
32    },
33};
34use nautilus_core::uuid::UUID4;
35use nautilus_model::{
36    data::{Data, OrderBookDeltas, QuoteTick, TradeTick},
37    enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
38    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
39    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
40    orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
41    types::{Price, Quantity},
42};
43use ustr::Ustr;
44
45use crate::{
46    manager::OrderManager,
47    matching_core::OrderMatchingCore,
48    messages::{
49        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
50    },
51    trailing::trailing_stop_calculate,
52};
53
54pub struct OrderEmulator {
55    clock: Rc<RefCell<dyn Clock>>,
56    cache: Rc<RefCell<Cache>>,
57    _msgbus: Rc<RefCell<MessageBus>>,
58    manager: Rc<RefCell<OrderManager>>,
59    state: Rc<RefCell<OrderEmulatorState>>,
60}
61
62struct OrderEmulatorExecuteHandler {
63    id: Ustr,
64    callback: Box<dyn Fn(&TradingCommand)>,
65}
66
67impl MessageHandler for OrderEmulatorExecuteHandler {
68    fn id(&self) -> Ustr {
69        self.id
70    }
71
72    fn handle(&self, msg: &dyn Any) {
73        (self.callback)(msg.downcast_ref::<&TradingCommand>().unwrap());
74    }
75    fn handle_response(&self, _resp: DataResponse) {}
76    fn handle_data(&self, _data: Data) {}
77    fn as_any(&self) -> &dyn Any {
78        self
79    }
80}
81
82impl OrderEmulator {
83    pub fn new(
84        clock: Rc<RefCell<dyn Clock>>,
85        cache: Rc<RefCell<Cache>>,
86        msgbus: Rc<RefCell<MessageBus>>,
87    ) -> Self {
88        // TODO: Impl Actor Trait
89        // self.register_base(portfolio, msgbus, cache, clock);
90
91        let active_local = true;
92        let manager = Rc::new(RefCell::new(OrderManager::new(
93            clock.clone(),
94            msgbus.clone(),
95            cache.clone(),
96            active_local,
97            None,
98            None,
99            None,
100        )));
101        let state = Rc::new(RefCell::new(OrderEmulatorState::new(
102            clock.clone(),
103            cache.clone(),
104            msgbus.clone(),
105            manager.clone(),
106        )));
107
108        let handler = {
109            let state = state.clone();
110            ShareableMessageHandler(Rc::new(OrderEmulatorExecuteHandler {
111                id: Ustr::from(&UUID4::new().to_string()),
112                callback: Box::new(move |command: &TradingCommand| {
113                    state.borrow_mut().execute(command.clone());
114                }),
115            }))
116        };
117
118        msgbus
119            .borrow_mut()
120            .register("OrderEmulator.execute", handler);
121
122        Self {
123            clock,
124            cache,
125            _msgbus: msgbus,
126            manager,
127            state,
128        }
129    }
130
131    #[must_use]
132    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
133        let mut quotes: Vec<InstrumentId> = self
134            .state
135            .borrow()
136            .subscribed_quotes
137            .iter()
138            .copied()
139            .collect();
140        quotes.sort();
141        quotes
142    }
143
144    #[must_use]
145    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
146        let mut trades: Vec<_> = self
147            .state
148            .borrow()
149            .subscribed_trades
150            .iter()
151            .copied()
152            .collect();
153        trades.sort();
154        trades
155    }
156
157    #[must_use]
158    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
159        self.manager.borrow().get_submit_order_commands()
160    }
161
162    #[must_use]
163    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
164        self.state
165            .borrow()
166            .matching_cores
167            .borrow()
168            .get(instrument_id)
169            .cloned()
170    }
171
172    // Action Implementations
173    pub fn on_start(&mut self) -> Result<()> {
174        let emulated_orders: Vec<OrderAny> = self
175            .cache
176            .borrow()
177            .orders_emulated(None, None, None, None)
178            .into_iter()
179            .cloned()
180            .collect();
181
182        if emulated_orders.is_empty() {
183            log::error!("No emulated orders to reactivate");
184            return Ok(());
185        }
186
187        for order in emulated_orders {
188            if !matches!(
189                order.status(),
190                OrderStatus::Initialized | OrderStatus::Emulated
191            ) {
192                continue; // No longer emulated
193            }
194
195            if let Some(parent_order_id) = &order.parent_order_id() {
196                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
197                    order.clone()
198                } else {
199                    log::error!("Cannot handle order: parent {parent_order_id} not found");
200                    continue;
201                };
202
203                let is_position_closed = parent_order
204                    .position_id()
205                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
206                if parent_order.is_closed() && is_position_closed {
207                    self.state.borrow_mut().manager_cancel_order(order.clone());
208                    continue; // Parent already closed
209                }
210
211                if parent_order.contingency_type() == Some(ContingencyType::Oto)
212                    && (parent_order.is_active_local()
213                        || parent_order.filled_qty() == Quantity::zero(0))
214                {
215                    continue; // Process contingency order later once parent triggered
216                }
217            }
218
219            let position_id = self
220                .cache
221                .borrow()
222                .position_id(&order.client_order_id())
223                .copied();
224            let client_id = self
225                .cache
226                .borrow()
227                .client_id(&order.client_order_id())
228                .copied();
229
230            let command = match SubmitOrder::new(
231                order.trader_id(),
232                client_id.unwrap(),
233                order.strategy_id(),
234                order.instrument_id(),
235                order.client_order_id(),
236                order.venue_order_id().unwrap(),
237                order.clone(),
238                order.exec_algorithm_id(),
239                position_id,
240                UUID4::new(),
241                self.clock.borrow().timestamp_ns(),
242            ) {
243                Ok(command) => command,
244                Err(e) => {
245                    log::error!("Cannot create submit order command: {}", e);
246                    continue;
247                }
248            };
249
250            self.state.borrow_mut().handle_submit_order(command);
251        }
252
253        Ok(())
254    }
255
256    pub const fn on_stop(&self) {}
257
258    pub fn on_reset(&mut self) {
259        self.manager.borrow_mut().reset();
260        self.state.borrow_mut().matching_cores.borrow_mut().clear();
261    }
262
263    pub const fn on_dispose(&self) {}
264
265    // --------------------------------------------------------------------------------------------
266
267    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
268        log::info!(
269            "Updating order {} quantity to {}",
270            order.client_order_id(),
271            new_quantity
272        );
273
274        // Generate event
275        let ts_now = self.clock.borrow().timestamp_ns();
276        let event = OrderUpdated::new(
277            order.trader_id(),
278            order.strategy_id(),
279            order.instrument_id(),
280            order.client_order_id(),
281            new_quantity,
282            UUID4::new(),
283            ts_now,
284            ts_now,
285            false,
286            None,
287            order.account_id(),
288            None,
289            None,
290        );
291
292        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
293            log::error!("Cannot apply order event: {:?}", e);
294            return;
295        }
296        if let Err(e) = self.cache.borrow_mut().update_order(order) {
297            log::error!("Cannot update order: {:?}", e);
298            return;
299        }
300
301        self.manager
302            .borrow()
303            .send_risk_event(OrderEventAny::Updated(event));
304    }
305
306    // -----------------------------------------------------------------------------------------------
307
308    pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
309        log::debug!("Processing OrderBookDeltas:{}", deltas);
310
311        let mut matching_core = if let Some(matching_core) = self
312            .state
313            .borrow()
314            .matching_cores
315            .borrow()
316            .get(&deltas.instrument_id)
317        {
318            matching_core.clone()
319        } else {
320            log::error!(
321                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
322                deltas.instrument_id
323            );
324            return;
325        };
326
327        let borrowed_cache = self.cache.borrow();
328        let book = if let Some(book) = borrowed_cache.order_book(&deltas.instrument_id) {
329            book
330        } else {
331            log::error!(
332                "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
333                deltas.instrument_id
334            );
335            return;
336        };
337
338        let best_bid = book.best_bid_price();
339        let best_ask = book.best_ask_price();
340
341        if let Some(best_bid) = best_bid {
342            matching_core.set_bid_raw(best_bid);
343        }
344
345        if let Some(best_ask) = best_ask {
346            matching_core.set_ask_raw(best_ask);
347        }
348
349        drop(borrowed_cache);
350        self.iterate_orders(&mut matching_core);
351
352        self.state
353            .borrow_mut()
354            .matching_cores
355            .borrow_mut()
356            .insert(deltas.instrument_id, matching_core);
357    }
358
359    pub fn on_quote_tick(&mut self, tick: QuoteTick) {
360        log::debug!("Processing QuoteTick:{}", tick);
361
362        let mut matching_core = if let Some(matching_core) = self
363            .state
364            .borrow()
365            .matching_cores
366            .borrow()
367            .get(&tick.instrument_id)
368        {
369            matching_core.clone()
370        } else {
371            log::error!(
372                "Cannot handle `QuoteTick`: no matching core for instrument {}",
373                tick.instrument_id
374            );
375            return;
376        };
377
378        matching_core.set_bid_raw(tick.bid_price);
379        matching_core.set_ask_raw(tick.ask_price);
380
381        self.iterate_orders(&mut matching_core);
382
383        self.state
384            .borrow_mut()
385            .matching_cores
386            .borrow_mut()
387            .insert(tick.instrument_id, matching_core);
388    }
389
390    pub fn on_trade_tick(&mut self, tick: TradeTick) {
391        log::debug!("Processing TradeTick:{}", tick);
392
393        let borrowed_state = self.state.borrow();
394        let mut matching_core = if let Some(matching_core) = borrowed_state
395            .matching_cores
396            .borrow()
397            .get(&tick.instrument_id)
398        {
399            matching_core.clone()
400        } else {
401            log::error!(
402                "Cannot handle `TradeTick`: no matching core for instrument {}",
403                tick.instrument_id
404            );
405            return;
406        };
407
408        matching_core.set_last_raw(tick.price);
409        if !self
410            .state
411            .borrow()
412            .subscribed_quotes
413            .contains(&tick.instrument_id)
414        {
415            matching_core.set_bid_raw(tick.price);
416            matching_core.set_ask_raw(tick.price);
417        }
418
419        drop(borrowed_state);
420        self.iterate_orders(&mut matching_core);
421
422        self.state
423            .borrow_mut()
424            .matching_cores
425            .borrow_mut()
426            .insert(tick.instrument_id, matching_core);
427    }
428
429    pub fn on_event(&mut self, event: OrderEventAny) {
430        OrderEmulatorState::on_event(
431            event,
432            self.manager.clone(),
433            self.cache.clone(),
434            self.state.borrow().matching_cores.clone(),
435        );
436    }
437
438    fn iterate_orders(&mut self, matching_core: &mut OrderMatchingCore) {
439        matching_core.iterate();
440
441        let orders = matching_core.get_orders_ask().iter().cloned();
442        for order in orders {
443            if order.is_closed() {
444                continue;
445            }
446
447            let mut order: OrderAny = order.clone().into();
448            if matches!(
449                order.order_type(),
450                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
451            ) {
452                self.state
453                    .borrow_mut()
454                    .update_trailing_stop_order(matching_core, &mut order);
455            }
456        }
457    }
458}
459
460struct OrderEmulatorEventHandler {
461    id: Ustr,
462    callback: Box<dyn Fn(&OrderEventAny)>,
463}
464
465impl MessageHandler for OrderEmulatorEventHandler {
466    fn id(&self) -> Ustr {
467        self.id
468    }
469
470    fn handle(&self, msg: &dyn Any) {
471        (self.callback)(msg.downcast_ref::<&OrderEventAny>().unwrap());
472    }
473    fn handle_response(&self, _resp: DataResponse) {}
474    fn handle_data(&self, _data: Data) {}
475    fn as_any(&self) -> &dyn Any {
476        self
477    }
478}
479
480pub struct OrderEmulatorState {
481    clock: Rc<RefCell<dyn Clock>>,
482    cache: Rc<RefCell<Cache>>,
483    msgbus: Rc<RefCell<MessageBus>>,
484    manager: Rc<RefCell<OrderManager>>,
485    matching_cores: Rc<RefCell<HashMap<InstrumentId, OrderMatchingCore>>>,
486    subscribed_quotes: HashSet<InstrumentId>,
487    subscribed_trades: HashSet<InstrumentId>,
488    subscribed_strategies: HashSet<StrategyId>,
489    monitored_positions: HashSet<PositionId>,
490}
491
492impl OrderEmulatorState {
493    pub fn new(
494        clock: Rc<RefCell<dyn Clock>>,
495        cache: Rc<RefCell<Cache>>,
496        msgbus: Rc<RefCell<MessageBus>>,
497        manager: Rc<RefCell<OrderManager>>,
498    ) -> Self {
499        Self {
500            manager,
501            matching_cores: Rc::new(RefCell::new(HashMap::new())),
502            subscribed_quotes: HashSet::new(),
503            subscribed_trades: HashSet::new(),
504            subscribed_strategies: HashSet::new(),
505            monitored_positions: HashSet::new(),
506            clock,
507            cache,
508            msgbus,
509        }
510    }
511
512    pub fn execute(&mut self, command: TradingCommand) {
513        log::info!("{RECV}{CMD} {command}");
514
515        match command {
516            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
517            TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
518            TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
519            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
520            TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
521            _ => log::error!("Cannot handle command: unrecognized {:?}", command),
522        }
523    }
524
525    fn handle_submit_order(&mut self, command: SubmitOrder) {
526        let mut order = command.order.clone();
527        let emulation_trigger = order.emulation_trigger();
528
529        assert!(
530            emulation_trigger != Some(TriggerType::NoTrigger),
531            "command.order.emulation_trigger must not be TriggerType::NoTrigger"
532        );
533        assert!(
534            self.manager
535                .borrow()
536                .get_submit_order_commands()
537                .contains_key(&order.client_order_id()),
538            "command.order.client_order_id must be in submit_order_commands"
539        );
540
541        if !matches!(
542            emulation_trigger,
543            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
544        ) {
545            log::error!(
546                "Cannot emulate order: `TriggerType` {:?} not supported",
547                emulation_trigger
548            );
549            self.manager_cancel_order(order.clone());
550            return;
551        }
552
553        self.check_monitoring(command.strategy_id, command.position_id);
554
555        // Get or create matching core
556        let trigger_instrument_id = order
557            .trigger_instrument_id()
558            .unwrap_or_else(|| order.instrument_id());
559
560        let matching_core = self
561            .matching_cores
562            .borrow()
563            .get(&trigger_instrument_id)
564            .cloned();
565
566        let mut matching_core = if let Some(core) = matching_core {
567            core
568        } else {
569            // Handle synthetic instruments
570            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
571                let synthetic = self
572                    .cache
573                    .borrow()
574                    .synthetic(&trigger_instrument_id)
575                    .cloned();
576                if let Some(synthetic) = synthetic {
577                    (synthetic.id, synthetic.price_increment)
578                } else {
579                    log::error!(
580                        "Cannot emulate order: no synthetic instrument {} for trigger",
581                        trigger_instrument_id
582                    );
583                    self.manager_cancel_order(order.clone());
584                    return;
585                }
586            } else {
587                let instrument = self
588                    .cache
589                    .borrow()
590                    .instrument(&trigger_instrument_id)
591                    .cloned();
592                if let Some(instrument) = instrument {
593                    (instrument.id(), instrument.price_increment())
594                } else {
595                    log::error!(
596                        "Cannot emulate order: no instrument {} for trigger",
597                        trigger_instrument_id
598                    );
599                    self.manager_cancel_order(order.clone());
600                    return;
601                }
602            };
603
604            self.create_matching_core(instrument_id, price_increment)
605        };
606
607        // Update trailing stop
608        if matches!(
609            order.order_type(),
610            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
611        ) {
612            self.update_trailing_stop_order(&matching_core, &mut order);
613            if order.trigger_price().is_none() {
614                log::error!(
615                    "Cannot handle trailing stop order with no trigger_price and no market updates"
616                );
617
618                self.manager_cancel_order(order.clone());
619                return;
620            }
621        }
622
623        // Cache command
624        self.manager
625            .borrow_mut()
626            .cache_submit_order_command(command);
627
628        // Check if immediately marketable
629        matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
630
631        // Handle data subscriptions
632        match emulation_trigger.unwrap() {
633            TriggerType::Default | TriggerType::BidAsk => {
634                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
635                    if !trigger_instrument_id.is_synthetic() {
636                        // TODO: Impl Actor Trait
637                        // self.subscribe_order_book_deltas(&trigger_instrument_id);
638                    }
639                    // TODO: Impl Actor Trait
640                    // self.subscribe_quote_ticks(&trigger_instrument_id)?;
641                    self.subscribed_quotes.insert(trigger_instrument_id);
642                }
643            }
644            TriggerType::LastPrice => {
645                if !self.subscribed_trades.contains(&trigger_instrument_id) {
646                    // TODO: Impl Actor Trait
647                    // self.subscribe_trade_ticks(&trigger_instrument_id)?;
648                    self.subscribed_trades.insert(trigger_instrument_id);
649                }
650            }
651            _ => {
652                log::error!("Invalid TriggerType: {:?}", emulation_trigger);
653                return;
654            }
655        }
656
657        // Check if order was already released
658        if !self
659            .manager
660            .borrow()
661            .get_submit_order_commands()
662            .contains_key(&order.client_order_id())
663        {
664            return; // Already released
665        }
666
667        // Hold in matching core
668        if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
669            log::error!("Cannot add order: {:?}", e);
670            return;
671        }
672
673        // Generate emulated event if needed
674        if order.status() == OrderStatus::Initialized {
675            let event = OrderEmulated::new(
676                order.trader_id(),
677                order.strategy_id(),
678                order.instrument_id(),
679                order.client_order_id(),
680                UUID4::new(),
681                self.clock.borrow().timestamp_ns(),
682                self.clock.borrow().timestamp_ns(),
683            );
684
685            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
686                log::error!("Cannot apply order event: {:?}", e);
687                return;
688            }
689
690            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
691                log::error!("Cannot update order: {:?}", e);
692                return;
693            }
694
695            self.manager
696                .borrow()
697                .send_risk_event(OrderEventAny::Emulated(event));
698
699            self.msgbus.borrow().publish(
700                &format!("events.order.{}", order.strategy_id()).into(),
701                &OrderEventAny::Emulated(event),
702            );
703        }
704
705        // Since we are cloning the matching core, we need to insert it back into the original hashmap
706        self.matching_cores
707            .borrow_mut()
708            .insert(trigger_instrument_id, matching_core);
709
710        log::info!("Emulating {}", order);
711    }
712
713    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
714        self.check_monitoring(command.strategy_id, command.position_id);
715
716        for order in &command.order_list.orders {
717            if let Some(parent_order_id) = order.parent_order_id() {
718                let cache = self.cache.borrow();
719                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
720                    parent_order
721                } else {
722                    log::error!("Parent order for {} not found", order.client_order_id());
723                    continue;
724                };
725
726                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
727                    continue; // Process contingency order later once parent triggered
728                }
729            }
730
731            if let Err(e) = self.manager.borrow_mut().create_new_submit_order(
732                order.clone(),
733                command.position_id,
734                Some(command.client_id),
735            ) {
736                log::error!("Error creating new submit order: {}", e);
737            }
738        }
739    }
740
741    fn handle_modify_order(&mut self, command: ModifyOrder) {
742        let cache = self.cache.borrow();
743        let order = if let Some(order) = cache.order(&command.client_order_id) {
744            order
745        } else {
746            log::error!("Cannot modify order: {} not found", command.client_order_id);
747            return;
748        };
749
750        let price = match command.price {
751            Some(price) => Some(price),
752            None => order.price(),
753        };
754
755        let trigger_price = match command.trigger_price {
756            Some(trigger_price) => Some(trigger_price),
757            None => order.trigger_price(),
758        };
759
760        // Generate event
761        let ts_now = self.clock.borrow().timestamp_ns();
762        let event = OrderUpdated::new(
763            order.trader_id(),
764            order.strategy_id(),
765            order.instrument_id(),
766            order.client_order_id(),
767            command.quantity.unwrap_or(order.quantity()),
768            UUID4::new(),
769            ts_now,
770            ts_now,
771            false,
772            order.venue_order_id(),
773            order.account_id(),
774            price,
775            trigger_price,
776        );
777
778        self.manager
779            .borrow()
780            .send_exec_event(OrderEventAny::Updated(event));
781
782        let trigger_instrument_id = order
783            .trigger_instrument_id()
784            .unwrap_or_else(|| order.instrument_id());
785
786        let borrowed_matching_cores = self.matching_cores.borrow();
787        let matching_core = if let Some(core) = borrowed_matching_cores.get(&trigger_instrument_id)
788        {
789            core
790        } else {
791            log::error!(
792                "Cannot handle `ModifyOrder`: no matching core for trigger instrument {}",
793                trigger_instrument_id
794            );
795            return;
796        };
797
798        matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
799
800        // TODO: fix
801        // match order.order_side() {
802        //     OrderSide::Buy => matching_core.get_orders_bid().sort(),
803        //     OrderSide::Sell => matching_core.get_orders_ask().sort(),
804        //     _ => return Err(anyhow::anyhow!("Invalid OrderSide")),
805        // }
806    }
807
808    fn handle_cancel_order(&mut self, command: CancelOrder) {
809        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
810            order.clone()
811        } else {
812            log::error!("Cannot cancel order: {} not found", command.client_order_id);
813            return;
814        };
815
816        let trigger_instrument_id = order
817            .trigger_instrument_id()
818            .unwrap_or_else(|| order.instrument_id());
819
820        let borrowed_matching_cores = self.matching_cores.borrow();
821        let matching_core = if let Some(core) = borrowed_matching_cores.get(&trigger_instrument_id)
822        {
823            core
824        } else {
825            drop(borrowed_matching_cores);
826            self.manager_cancel_order(order);
827            return;
828        };
829
830        if !matching_core.order_exists(order.client_order_id())
831            && order.is_open()
832            && !order.is_pending_cancel()
833        {
834            // Order not held in the emulator
835            self.manager
836                .borrow()
837                .send_exec_command(TradingCommand::CancelOrder(command));
838        } else {
839            drop(borrowed_matching_cores);
840            self.manager_cancel_order(order);
841        }
842    }
843
844    fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
845        let borrowed_matching_cores = self.matching_cores.borrow();
846        let matching_core = match borrowed_matching_cores.get(&command.instrument_id) {
847            Some(core) => core,
848            None => return, // No orders to cancel
849        };
850
851        let orders_to_cancel = match command.order_side {
852            OrderSide::NoOrderSide => {
853                // Get both bid and ask orders
854                let mut all_orders = Vec::new();
855                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
856                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
857                all_orders
858            }
859            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
860            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
861        };
862
863        drop(borrowed_matching_cores);
864
865        // Process all orders in a single iteration
866        for order in orders_to_cancel {
867            let order: OrderAny = order.into();
868            self.manager_cancel_order(order);
869        }
870    }
871
872    // Cloned from manager to bypass few layers of handlers;
873    fn manager_cancel_order(&mut self, order: OrderAny) {
874        if self
875            .cache
876            .borrow()
877            .is_order_pending_cancel_local(&order.client_order_id())
878        {
879            return;
880        }
881
882        if order.is_closed() {
883            log::warn!("Cannot cancel order: already closed");
884            return;
885        }
886
887        self.manager
888            .borrow_mut()
889            .pop_submit_order_command(order.client_order_id());
890
891        // OrderEmulator.cancel_order
892        self.cancel_order(&order);
893    }
894
895    fn cancel_order(&mut self, order: &OrderAny) {
896        log::info!("Canceling order {}", order.client_order_id());
897
898        let mut order = order.clone();
899        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
900
901        let trigger_instrument_id = order
902            .trigger_instrument_id()
903            .unwrap_or(order.instrument_id());
904
905        if let Some(matching_core) = self
906            .matching_cores
907            .borrow_mut()
908            .get_mut(&trigger_instrument_id)
909        {
910            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
911                log::error!("Cannot delete order: {:?}", e);
912            }
913        }
914
915        self.cache
916            .borrow_mut()
917            .update_order_pending_cancel_local(&order);
918
919        // Generate event
920        let ts_now = self.clock.borrow().timestamp_ns();
921        let event = OrderCanceled::new(
922            order.trader_id(),
923            order.strategy_id(),
924            order.instrument_id(),
925            order.client_order_id(),
926            UUID4::new(),
927            ts_now,
928            ts_now,
929            false,
930            order.venue_order_id(),
931            order.account_id(),
932        );
933        self.manager
934            .borrow()
935            .send_exec_event(OrderEventAny::Canceled(event));
936    }
937
938    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
939        if !self.subscribed_strategies.contains(&strategy_id) {
940            let handler = {
941                let manager = self.manager.clone();
942                let cache = self.cache.clone();
943                let matching_cores = self.matching_cores.clone();
944                ShareableMessageHandler(Rc::new(OrderEmulatorEventHandler {
945                    id: Ustr::from(&UUID4::new().to_string()),
946                    callback: Box::new(move |event: &OrderEventAny| {
947                        Self::on_event(
948                            event.clone(),
949                            manager.clone(),
950                            cache.clone(),
951                            matching_cores.clone(),
952                        );
953                    }),
954                }))
955            };
956
957            // Subscribe to all strategy events
958            self.msgbus.borrow_mut().subscribe(
959                format!("events.order.{strategy_id}"),
960                handler.clone(),
961                None,
962            );
963            self.msgbus.borrow_mut().subscribe(
964                format!("events.position.{strategy_id}"),
965                handler,
966                None,
967            );
968
969            self.subscribed_strategies.insert(strategy_id);
970            log::info!(
971                "Subscribed to strategy {} order and position events",
972                strategy_id
973            );
974        }
975
976        if let Some(position_id) = position_id {
977            if !self.monitored_positions.contains(&position_id) {
978                self.monitored_positions.insert(position_id);
979            }
980        }
981    }
982
983    fn on_event(
984        event: OrderEventAny,
985        manager: Rc<RefCell<OrderManager>>,
986        cache: Rc<RefCell<Cache>>,
987        matching_cores: Rc<RefCell<HashMap<InstrumentId, OrderMatchingCore>>>,
988    ) {
989        log::info!("{RECV}{EVT} {event}");
990
991        manager.borrow_mut().handle_event(event.clone());
992
993        if let Some(order) = cache.borrow().order(&event.client_order_id()) {
994            if order.is_closed() {
995                if let Some(matching_core) =
996                    matching_cores.borrow_mut().get_mut(&order.instrument_id())
997                {
998                    if let Err(e) =
999                        matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
1000                    {
1001                        log::error!("Error deleting order: {}", e);
1002                    }
1003                }
1004            }
1005        }
1006        // else: Order not in cache yet
1007    }
1008
1009    fn create_matching_core(
1010        &mut self,
1011        instrument_id: InstrumentId,
1012        price_increment: Price,
1013    ) -> OrderMatchingCore {
1014        let matching_core =
1015            OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
1016        self.matching_cores
1017            .borrow_mut()
1018            .insert(instrument_id, matching_core.clone());
1019        log::info!("Creating matching core for {:?}", instrument_id);
1020        matching_core
1021    }
1022
1023    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
1024        match order.order_type() {
1025            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
1026                self.fill_limit_order(order);
1027            }
1028            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
1029                self.fill_market_order(order);
1030            }
1031            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
1032        }
1033    }
1034
1035    fn fill_limit_order(&mut self, order: &mut OrderAny) {
1036        if matches!(order.order_type(), OrderType::Limit) {
1037            self.fill_market_order(order);
1038            return;
1039        }
1040
1041        // Fetch command
1042        let mut command = match self
1043            .manager
1044            .borrow_mut()
1045            .pop_submit_order_command(order.client_order_id())
1046        {
1047            Some(command) => command,
1048            None => return, // Order already released
1049        };
1050
1051        let trigger_instrument_id = order
1052            .trigger_instrument_id()
1053            .unwrap_or(order.instrument_id());
1054
1055        let mut matching_core = self
1056            .matching_cores
1057            .borrow()
1058            .get(&trigger_instrument_id)
1059            .cloned();
1060        if let Some(ref mut matching_core) = matching_core {
1061            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
1062                log::error!("Cannot delete order: {:?}", e);
1063            } else {
1064                // Update matching cores
1065                self.matching_cores
1066                    .borrow_mut()
1067                    .insert(trigger_instrument_id, matching_core.clone());
1068            }
1069        }
1070
1071        let emulation_trigger = TriggerType::NoTrigger;
1072
1073        // Transform order
1074        let mut transformed = if let Ok(transformed) = LimitOrder::new(
1075            order.trader_id(),
1076            order.strategy_id(),
1077            order.instrument_id(),
1078            order.client_order_id(),
1079            order.order_side(),
1080            order.quantity(),
1081            order.price().unwrap(),
1082            order.time_in_force(),
1083            order.expire_time(),
1084            order.is_post_only(),
1085            order.is_reduce_only(),
1086            order.is_quote_quantity(),
1087            order.display_qty(),
1088            Some(emulation_trigger),
1089            Some(trigger_instrument_id),
1090            order.contingency_type(),
1091            order.order_list_id(),
1092            order.linked_order_ids(),
1093            order.parent_order_id(),
1094            order.exec_algorithm_id(),
1095            order.exec_algorithm_params(),
1096            order.exec_spawn_id(),
1097            order.tags(),
1098            UUID4::new(),
1099            self.clock.borrow().timestamp_ns(),
1100        ) {
1101            transformed
1102        } else {
1103            log::error!("Cannot create limit order");
1104            return;
1105        };
1106
1107        transformed.liquidity_side = order.liquidity_side();
1108        // TODO: fix
1109        // let triggered_price = order.trigger_price();
1110        // if triggered_price.is_some() {
1111        //     transformed.trigger_price() = (triggered_price.unwrap());
1112        // }
1113
1114        let original_events = order.events();
1115
1116        for event in original_events {
1117            transformed.events.insert(0, event.clone());
1118        }
1119
1120        if let Err(e) = self.cache.borrow_mut().add_order(
1121            OrderAny::Limit(transformed.clone()),
1122            command.position_id,
1123            Some(command.client_id),
1124            true,
1125        ) {
1126            log::error!("Failed to add order: {}", e);
1127        }
1128
1129        // Replace commands order with transformed order
1130        command.order = OrderAny::Limit(transformed.clone());
1131
1132        self.msgbus.borrow().publish(
1133            &format!("events.order.{}", order.strategy_id()).into(),
1134            transformed.last_event(),
1135        );
1136
1137        // Determine triggered price
1138        // TODO: fix unwraps
1139        let released_price = match order.order_side() {
1140            OrderSide::Buy => matching_core.unwrap().ask,
1141            OrderSide::Sell => matching_core.unwrap().bid,
1142            _ => panic!("invalid `OrderSide`"),
1143        };
1144
1145        // Generate event
1146        let event = OrderReleased::new(
1147            order.trader_id(),
1148            order.strategy_id(),
1149            order.instrument_id(),
1150            order.client_order_id(),
1151            released_price.unwrap(),
1152            UUID4::new(),
1153            self.clock.borrow().timestamp_ns(),
1154            self.clock.borrow().timestamp_ns(),
1155        );
1156
1157        if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1158            log::error!("Failed to apply order event: {}", e);
1159        }
1160        if let Err(e) = self
1161            .cache
1162            .borrow_mut()
1163            .update_order(&OrderAny::Limit(transformed.clone()))
1164        {
1165            log::error!("Failed to update order: {}", e);
1166        }
1167
1168        self.manager
1169            .borrow()
1170            .send_risk_event(OrderEventAny::Released(event));
1171
1172        log::info!("Releasing order {}", order.client_order_id());
1173
1174        // Publish event
1175        self.msgbus.borrow().publish(
1176            &format!("events.order.{}", transformed.strategy_id()).into(),
1177            &OrderEventAny::Released(event),
1178        );
1179
1180        if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1181            self.manager
1182                .borrow()
1183                .send_algo_command(command, exec_algorithm_id);
1184        } else {
1185            self.manager
1186                .borrow()
1187                .send_exec_command(TradingCommand::SubmitOrder(command));
1188        }
1189    }
1190
1191    fn fill_market_order(&mut self, order: &mut OrderAny) {
1192        // Fetch command
1193        let mut command = match self
1194            .manager
1195            .borrow_mut()
1196            .pop_submit_order_command(order.client_order_id())
1197        {
1198            Some(command) => command,
1199            None => panic!("invalid operation `_fill_market_order` with no command"),
1200        };
1201
1202        let trigger_instrument_id = order
1203            .trigger_instrument_id()
1204            .unwrap_or(order.instrument_id());
1205
1206        let mut matching_core = self
1207            .matching_cores
1208            .borrow()
1209            .get(&trigger_instrument_id)
1210            .cloned();
1211        if let Some(ref mut matching_core) = matching_core {
1212            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
1213                log::error!("Cannot delete order: {:?}", e);
1214            } else {
1215                // Update matching cores
1216                self.matching_cores
1217                    .borrow_mut()
1218                    .insert(trigger_instrument_id, matching_core.clone());
1219            }
1220        }
1221
1222        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1223
1224        // Transform order
1225        let mut transformed = MarketOrder::new(
1226            order.trader_id(),
1227            order.strategy_id(),
1228            order.instrument_id(),
1229            order.client_order_id(),
1230            order.order_side(),
1231            order.quantity(),
1232            order.time_in_force(),
1233            UUID4::new(),
1234            self.clock.borrow().timestamp_ns(),
1235            order.is_reduce_only(),
1236            order.is_quote_quantity(),
1237            order.contingency_type(),
1238            order.order_list_id(),
1239            order.linked_order_ids(),
1240            order.parent_order_id(),
1241            order.exec_algorithm_id(),
1242            order.exec_algorithm_params(),
1243            order.exec_spawn_id(),
1244            order.tags(),
1245        );
1246
1247        let original_events = order.events();
1248
1249        for event in original_events {
1250            transformed.events.insert(0, event.clone());
1251        }
1252
1253        if let Err(e) = self.cache.borrow_mut().add_order(
1254            OrderAny::Market(transformed.clone()),
1255            command.position_id,
1256            Some(command.client_id),
1257            true,
1258        ) {
1259            log::error!("Failed to add order: {}", e);
1260        }
1261
1262        // Replace commands order with transformed order
1263        command.order = OrderAny::Market(transformed.clone());
1264
1265        self.msgbus.borrow().publish(
1266            &format!("events.order.{}", order.strategy_id()).into(),
1267            transformed.last_event(),
1268        );
1269
1270        // Determine triggered price
1271        // TODO: fix unwraps
1272        let released_price = match order.order_side() {
1273            OrderSide::Buy => matching_core.unwrap().ask,
1274            OrderSide::Sell => matching_core.unwrap().bid,
1275            _ => panic!("invalid `OrderSide`"),
1276        };
1277
1278        // Generate event
1279        let ts_now = self.clock.borrow().timestamp_ns();
1280        let event = OrderReleased::new(
1281            order.trader_id(),
1282            order.strategy_id(),
1283            order.instrument_id(),
1284            order.client_order_id(),
1285            released_price.unwrap(),
1286            UUID4::new(),
1287            ts_now,
1288            ts_now,
1289        );
1290
1291        if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1292            log::error!("Failed to apply order event: {}", e);
1293        }
1294
1295        if let Err(e) = self
1296            .cache
1297            .borrow_mut()
1298            .update_order(&OrderAny::Market(transformed))
1299        {
1300            log::error!("Failed to update order: {}", e);
1301        }
1302        self.manager
1303            .borrow()
1304            .send_risk_event(OrderEventAny::Released(event));
1305
1306        log::info!("Releasing order {}", order.client_order_id());
1307
1308        // Publish event
1309        self.msgbus.borrow().publish(
1310            &format!("events.order.{}", order.strategy_id()).into(),
1311            &OrderEventAny::Released(event),
1312        );
1313
1314        if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1315            self.manager
1316                .borrow()
1317                .send_algo_command(command, exec_algorithm_id);
1318        } else {
1319            self.manager
1320                .borrow()
1321                .send_exec_command(TradingCommand::SubmitOrder(command));
1322        }
1323    }
1324
1325    fn update_trailing_stop_order(&self, matching_core: &OrderMatchingCore, order: &mut OrderAny) {
1326        let mut bid = None;
1327        let mut ask = None;
1328        let mut last = None;
1329
1330        if matching_core.is_bid_initialized {
1331            bid = matching_core.bid;
1332        }
1333        if matching_core.is_ask_initialized {
1334            ask = matching_core.ask;
1335        }
1336        if matching_core.is_last_initialized {
1337            last = matching_core.last;
1338        }
1339
1340        let quote_tick = self
1341            .cache
1342            .borrow()
1343            .quote(&matching_core.instrument_id)
1344            .copied();
1345        let trade_tick = self
1346            .cache
1347            .borrow()
1348            .trade(&matching_core.instrument_id)
1349            .copied();
1350
1351        if bid.is_none() && quote_tick.is_some() {
1352            bid = Some(quote_tick.unwrap().bid_price);
1353        }
1354        if ask.is_none() && quote_tick.is_some() {
1355            ask = Some(quote_tick.unwrap().ask_price);
1356        }
1357        if last.is_none() && trade_tick.is_some() {
1358            last = Some(trade_tick.unwrap().price);
1359        }
1360
1361        let (new_trigger_price, new_price) = if let Ok((new_trigger_price, new_price)) =
1362            trailing_stop_calculate(matching_core.price_increment, order, bid, ask, last)
1363        {
1364            (new_trigger_price, new_price)
1365        } else {
1366            log::warn!("Cannot calculate trailing stop order");
1367            return;
1368        };
1369
1370        let (new_trigger_price, new_price) = match (new_trigger_price, new_price) {
1371            (None, None) => return, // No updates
1372            _ => (new_trigger_price, new_price),
1373        };
1374
1375        // Generate event
1376        let ts_now = self.clock.borrow().timestamp_ns();
1377        let event = OrderUpdated::new(
1378            order.trader_id(),
1379            order.strategy_id(),
1380            order.instrument_id(),
1381            order.client_order_id(),
1382            order.quantity(),
1383            UUID4::new(),
1384            ts_now,
1385            ts_now,
1386            false,
1387            order.venue_order_id(),
1388            order.account_id(),
1389            new_price,
1390            new_trigger_price,
1391        );
1392
1393        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
1394            log::error!("Failed to apply order event: {}", e);
1395        }
1396        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1397            log::error!("Failed to update order: {}", e);
1398        }
1399
1400        self.manager
1401            .borrow()
1402            .send_risk_event(OrderEventAny::Updated(event));
1403    }
1404}