1use std::{
17 any::Any,
18 cell::RefCell,
19 collections::{HashMap, HashSet},
20 rc::Rc,
21};
22
23use anyhow::Result;
24use nautilus_common::{
25 cache::Cache,
26 clock::Clock,
27 logging::{CMD, EVT, RECV},
28 messages::data::DataResponse,
29 msgbus::{
30 handler::{MessageHandler, ShareableMessageHandler},
31 MessageBus,
32 },
33};
34use nautilus_core::uuid::UUID4;
35use nautilus_model::{
36 data::{Data, OrderBookDeltas, QuoteTick, TradeTick},
37 enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
38 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
39 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
40 orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
41 types::{Price, Quantity},
42};
43use ustr::Ustr;
44
45use crate::{
46 manager::OrderManager,
47 matching_core::OrderMatchingCore,
48 messages::{
49 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
50 },
51 trailing::trailing_stop_calculate,
52};
53
54pub struct OrderEmulator {
55 clock: Rc<RefCell<dyn Clock>>,
56 cache: Rc<RefCell<Cache>>,
57 _msgbus: Rc<RefCell<MessageBus>>,
58 manager: Rc<RefCell<OrderManager>>,
59 state: Rc<RefCell<OrderEmulatorState>>,
60}
61
62struct OrderEmulatorExecuteHandler {
63 id: Ustr,
64 callback: Box<dyn Fn(&TradingCommand)>,
65}
66
67impl MessageHandler for OrderEmulatorExecuteHandler {
68 fn id(&self) -> Ustr {
69 self.id
70 }
71
72 fn handle(&self, msg: &dyn Any) {
73 (self.callback)(msg.downcast_ref::<&TradingCommand>().unwrap());
74 }
75 fn handle_response(&self, _resp: DataResponse) {}
76 fn handle_data(&self, _data: Data) {}
77 fn as_any(&self) -> &dyn Any {
78 self
79 }
80}
81
82impl OrderEmulator {
83 pub fn new(
84 clock: Rc<RefCell<dyn Clock>>,
85 cache: Rc<RefCell<Cache>>,
86 msgbus: Rc<RefCell<MessageBus>>,
87 ) -> Self {
88 let active_local = true;
92 let manager = Rc::new(RefCell::new(OrderManager::new(
93 clock.clone(),
94 msgbus.clone(),
95 cache.clone(),
96 active_local,
97 None,
98 None,
99 None,
100 )));
101 let state = Rc::new(RefCell::new(OrderEmulatorState::new(
102 clock.clone(),
103 cache.clone(),
104 msgbus.clone(),
105 manager.clone(),
106 )));
107
108 let handler = {
109 let state = state.clone();
110 ShareableMessageHandler(Rc::new(OrderEmulatorExecuteHandler {
111 id: Ustr::from(&UUID4::new().to_string()),
112 callback: Box::new(move |command: &TradingCommand| {
113 state.borrow_mut().execute(command.clone());
114 }),
115 }))
116 };
117
118 msgbus
119 .borrow_mut()
120 .register("OrderEmulator.execute", handler);
121
122 Self {
123 clock,
124 cache,
125 _msgbus: msgbus,
126 manager,
127 state,
128 }
129 }
130
131 #[must_use]
132 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
133 let mut quotes: Vec<InstrumentId> = self
134 .state
135 .borrow()
136 .subscribed_quotes
137 .iter()
138 .copied()
139 .collect();
140 quotes.sort();
141 quotes
142 }
143
144 #[must_use]
145 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
146 let mut trades: Vec<_> = self
147 .state
148 .borrow()
149 .subscribed_trades
150 .iter()
151 .copied()
152 .collect();
153 trades.sort();
154 trades
155 }
156
157 #[must_use]
158 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
159 self.manager.borrow().get_submit_order_commands()
160 }
161
162 #[must_use]
163 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
164 self.state
165 .borrow()
166 .matching_cores
167 .borrow()
168 .get(instrument_id)
169 .cloned()
170 }
171
172 pub fn on_start(&mut self) -> Result<()> {
174 let emulated_orders: Vec<OrderAny> = self
175 .cache
176 .borrow()
177 .orders_emulated(None, None, None, None)
178 .into_iter()
179 .cloned()
180 .collect();
181
182 if emulated_orders.is_empty() {
183 log::error!("No emulated orders to reactivate");
184 return Ok(());
185 }
186
187 for order in emulated_orders {
188 if !matches!(
189 order.status(),
190 OrderStatus::Initialized | OrderStatus::Emulated
191 ) {
192 continue; }
194
195 if let Some(parent_order_id) = &order.parent_order_id() {
196 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
197 order.clone()
198 } else {
199 log::error!("Cannot handle order: parent {parent_order_id} not found");
200 continue;
201 };
202
203 let is_position_closed = parent_order
204 .position_id()
205 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
206 if parent_order.is_closed() && is_position_closed {
207 self.state.borrow_mut().manager_cancel_order(order.clone());
208 continue; }
210
211 if parent_order.contingency_type() == Some(ContingencyType::Oto)
212 && (parent_order.is_active_local()
213 || parent_order.filled_qty() == Quantity::zero(0))
214 {
215 continue; }
217 }
218
219 let position_id = self
220 .cache
221 .borrow()
222 .position_id(&order.client_order_id())
223 .copied();
224 let client_id = self
225 .cache
226 .borrow()
227 .client_id(&order.client_order_id())
228 .copied();
229
230 let command = match SubmitOrder::new(
231 order.trader_id(),
232 client_id.unwrap(),
233 order.strategy_id(),
234 order.instrument_id(),
235 order.client_order_id(),
236 order.venue_order_id().unwrap(),
237 order.clone(),
238 order.exec_algorithm_id(),
239 position_id,
240 UUID4::new(),
241 self.clock.borrow().timestamp_ns(),
242 ) {
243 Ok(command) => command,
244 Err(e) => {
245 log::error!("Cannot create submit order command: {}", e);
246 continue;
247 }
248 };
249
250 self.state.borrow_mut().handle_submit_order(command);
251 }
252
253 Ok(())
254 }
255
256 pub const fn on_stop(&self) {}
257
258 pub fn on_reset(&mut self) {
259 self.manager.borrow_mut().reset();
260 self.state.borrow_mut().matching_cores.borrow_mut().clear();
261 }
262
263 pub const fn on_dispose(&self) {}
264
265 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
268 log::info!(
269 "Updating order {} quantity to {}",
270 order.client_order_id(),
271 new_quantity
272 );
273
274 let ts_now = self.clock.borrow().timestamp_ns();
276 let event = OrderUpdated::new(
277 order.trader_id(),
278 order.strategy_id(),
279 order.instrument_id(),
280 order.client_order_id(),
281 new_quantity,
282 UUID4::new(),
283 ts_now,
284 ts_now,
285 false,
286 None,
287 order.account_id(),
288 None,
289 None,
290 );
291
292 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
293 log::error!("Cannot apply order event: {:?}", e);
294 return;
295 }
296 if let Err(e) = self.cache.borrow_mut().update_order(order) {
297 log::error!("Cannot update order: {:?}", e);
298 return;
299 }
300
301 self.manager
302 .borrow()
303 .send_risk_event(OrderEventAny::Updated(event));
304 }
305
306 pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
309 log::debug!("Processing OrderBookDeltas:{}", deltas);
310
311 let mut matching_core = if let Some(matching_core) = self
312 .state
313 .borrow()
314 .matching_cores
315 .borrow()
316 .get(&deltas.instrument_id)
317 {
318 matching_core.clone()
319 } else {
320 log::error!(
321 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
322 deltas.instrument_id
323 );
324 return;
325 };
326
327 let borrowed_cache = self.cache.borrow();
328 let book = if let Some(book) = borrowed_cache.order_book(&deltas.instrument_id) {
329 book
330 } else {
331 log::error!(
332 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
333 deltas.instrument_id
334 );
335 return;
336 };
337
338 let best_bid = book.best_bid_price();
339 let best_ask = book.best_ask_price();
340
341 if let Some(best_bid) = best_bid {
342 matching_core.set_bid_raw(best_bid);
343 }
344
345 if let Some(best_ask) = best_ask {
346 matching_core.set_ask_raw(best_ask);
347 }
348
349 drop(borrowed_cache);
350 self.iterate_orders(&mut matching_core);
351
352 self.state
353 .borrow_mut()
354 .matching_cores
355 .borrow_mut()
356 .insert(deltas.instrument_id, matching_core);
357 }
358
359 pub fn on_quote_tick(&mut self, tick: QuoteTick) {
360 log::debug!("Processing QuoteTick:{}", tick);
361
362 let mut matching_core = if let Some(matching_core) = self
363 .state
364 .borrow()
365 .matching_cores
366 .borrow()
367 .get(&tick.instrument_id)
368 {
369 matching_core.clone()
370 } else {
371 log::error!(
372 "Cannot handle `QuoteTick`: no matching core for instrument {}",
373 tick.instrument_id
374 );
375 return;
376 };
377
378 matching_core.set_bid_raw(tick.bid_price);
379 matching_core.set_ask_raw(tick.ask_price);
380
381 self.iterate_orders(&mut matching_core);
382
383 self.state
384 .borrow_mut()
385 .matching_cores
386 .borrow_mut()
387 .insert(tick.instrument_id, matching_core);
388 }
389
390 pub fn on_trade_tick(&mut self, tick: TradeTick) {
391 log::debug!("Processing TradeTick:{}", tick);
392
393 let borrowed_state = self.state.borrow();
394 let mut matching_core = if let Some(matching_core) = borrowed_state
395 .matching_cores
396 .borrow()
397 .get(&tick.instrument_id)
398 {
399 matching_core.clone()
400 } else {
401 log::error!(
402 "Cannot handle `TradeTick`: no matching core for instrument {}",
403 tick.instrument_id
404 );
405 return;
406 };
407
408 matching_core.set_last_raw(tick.price);
409 if !self
410 .state
411 .borrow()
412 .subscribed_quotes
413 .contains(&tick.instrument_id)
414 {
415 matching_core.set_bid_raw(tick.price);
416 matching_core.set_ask_raw(tick.price);
417 }
418
419 drop(borrowed_state);
420 self.iterate_orders(&mut matching_core);
421
422 self.state
423 .borrow_mut()
424 .matching_cores
425 .borrow_mut()
426 .insert(tick.instrument_id, matching_core);
427 }
428
429 pub fn on_event(&mut self, event: OrderEventAny) {
430 OrderEmulatorState::on_event(
431 event,
432 self.manager.clone(),
433 self.cache.clone(),
434 self.state.borrow().matching_cores.clone(),
435 );
436 }
437
438 fn iterate_orders(&mut self, matching_core: &mut OrderMatchingCore) {
439 matching_core.iterate();
440
441 let orders = matching_core.get_orders_ask().iter().cloned();
442 for order in orders {
443 if order.is_closed() {
444 continue;
445 }
446
447 let mut order: OrderAny = order.clone().into();
448 if matches!(
449 order.order_type(),
450 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
451 ) {
452 self.state
453 .borrow_mut()
454 .update_trailing_stop_order(matching_core, &mut order);
455 }
456 }
457 }
458}
459
460struct OrderEmulatorEventHandler {
461 id: Ustr,
462 callback: Box<dyn Fn(&OrderEventAny)>,
463}
464
465impl MessageHandler for OrderEmulatorEventHandler {
466 fn id(&self) -> Ustr {
467 self.id
468 }
469
470 fn handle(&self, msg: &dyn Any) {
471 (self.callback)(msg.downcast_ref::<&OrderEventAny>().unwrap());
472 }
473 fn handle_response(&self, _resp: DataResponse) {}
474 fn handle_data(&self, _data: Data) {}
475 fn as_any(&self) -> &dyn Any {
476 self
477 }
478}
479
480pub struct OrderEmulatorState {
481 clock: Rc<RefCell<dyn Clock>>,
482 cache: Rc<RefCell<Cache>>,
483 msgbus: Rc<RefCell<MessageBus>>,
484 manager: Rc<RefCell<OrderManager>>,
485 matching_cores: Rc<RefCell<HashMap<InstrumentId, OrderMatchingCore>>>,
486 subscribed_quotes: HashSet<InstrumentId>,
487 subscribed_trades: HashSet<InstrumentId>,
488 subscribed_strategies: HashSet<StrategyId>,
489 monitored_positions: HashSet<PositionId>,
490}
491
492impl OrderEmulatorState {
493 pub fn new(
494 clock: Rc<RefCell<dyn Clock>>,
495 cache: Rc<RefCell<Cache>>,
496 msgbus: Rc<RefCell<MessageBus>>,
497 manager: Rc<RefCell<OrderManager>>,
498 ) -> Self {
499 Self {
500 manager,
501 matching_cores: Rc::new(RefCell::new(HashMap::new())),
502 subscribed_quotes: HashSet::new(),
503 subscribed_trades: HashSet::new(),
504 subscribed_strategies: HashSet::new(),
505 monitored_positions: HashSet::new(),
506 clock,
507 cache,
508 msgbus,
509 }
510 }
511
512 pub fn execute(&mut self, command: TradingCommand) {
513 log::info!("{RECV}{CMD} {command}");
514
515 match command {
516 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
517 TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
518 TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
519 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
520 TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
521 _ => log::error!("Cannot handle command: unrecognized {:?}", command),
522 }
523 }
524
525 fn handle_submit_order(&mut self, command: SubmitOrder) {
526 let mut order = command.order.clone();
527 let emulation_trigger = order.emulation_trigger();
528
529 assert!(
530 emulation_trigger != Some(TriggerType::NoTrigger),
531 "command.order.emulation_trigger must not be TriggerType::NoTrigger"
532 );
533 assert!(
534 self.manager
535 .borrow()
536 .get_submit_order_commands()
537 .contains_key(&order.client_order_id()),
538 "command.order.client_order_id must be in submit_order_commands"
539 );
540
541 if !matches!(
542 emulation_trigger,
543 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
544 ) {
545 log::error!(
546 "Cannot emulate order: `TriggerType` {:?} not supported",
547 emulation_trigger
548 );
549 self.manager_cancel_order(order.clone());
550 return;
551 }
552
553 self.check_monitoring(command.strategy_id, command.position_id);
554
555 let trigger_instrument_id = order
557 .trigger_instrument_id()
558 .unwrap_or_else(|| order.instrument_id());
559
560 let matching_core = self
561 .matching_cores
562 .borrow()
563 .get(&trigger_instrument_id)
564 .cloned();
565
566 let mut matching_core = if let Some(core) = matching_core {
567 core
568 } else {
569 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
571 let synthetic = self
572 .cache
573 .borrow()
574 .synthetic(&trigger_instrument_id)
575 .cloned();
576 if let Some(synthetic) = synthetic {
577 (synthetic.id, synthetic.price_increment)
578 } else {
579 log::error!(
580 "Cannot emulate order: no synthetic instrument {} for trigger",
581 trigger_instrument_id
582 );
583 self.manager_cancel_order(order.clone());
584 return;
585 }
586 } else {
587 let instrument = self
588 .cache
589 .borrow()
590 .instrument(&trigger_instrument_id)
591 .cloned();
592 if let Some(instrument) = instrument {
593 (instrument.id(), instrument.price_increment())
594 } else {
595 log::error!(
596 "Cannot emulate order: no instrument {} for trigger",
597 trigger_instrument_id
598 );
599 self.manager_cancel_order(order.clone());
600 return;
601 }
602 };
603
604 self.create_matching_core(instrument_id, price_increment)
605 };
606
607 if matches!(
609 order.order_type(),
610 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
611 ) {
612 self.update_trailing_stop_order(&matching_core, &mut order);
613 if order.trigger_price().is_none() {
614 log::error!(
615 "Cannot handle trailing stop order with no trigger_price and no market updates"
616 );
617
618 self.manager_cancel_order(order.clone());
619 return;
620 }
621 }
622
623 self.manager
625 .borrow_mut()
626 .cache_submit_order_command(command);
627
628 matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
630
631 match emulation_trigger.unwrap() {
633 TriggerType::Default | TriggerType::BidAsk => {
634 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
635 if !trigger_instrument_id.is_synthetic() {
636 }
639 self.subscribed_quotes.insert(trigger_instrument_id);
642 }
643 }
644 TriggerType::LastPrice => {
645 if !self.subscribed_trades.contains(&trigger_instrument_id) {
646 self.subscribed_trades.insert(trigger_instrument_id);
649 }
650 }
651 _ => {
652 log::error!("Invalid TriggerType: {:?}", emulation_trigger);
653 return;
654 }
655 }
656
657 if !self
659 .manager
660 .borrow()
661 .get_submit_order_commands()
662 .contains_key(&order.client_order_id())
663 {
664 return; }
666
667 if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
669 log::error!("Cannot add order: {:?}", e);
670 return;
671 }
672
673 if order.status() == OrderStatus::Initialized {
675 let event = OrderEmulated::new(
676 order.trader_id(),
677 order.strategy_id(),
678 order.instrument_id(),
679 order.client_order_id(),
680 UUID4::new(),
681 self.clock.borrow().timestamp_ns(),
682 self.clock.borrow().timestamp_ns(),
683 );
684
685 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
686 log::error!("Cannot apply order event: {:?}", e);
687 return;
688 }
689
690 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
691 log::error!("Cannot update order: {:?}", e);
692 return;
693 }
694
695 self.manager
696 .borrow()
697 .send_risk_event(OrderEventAny::Emulated(event));
698
699 self.msgbus.borrow().publish(
700 &format!("events.order.{}", order.strategy_id()).into(),
701 &OrderEventAny::Emulated(event),
702 );
703 }
704
705 self.matching_cores
707 .borrow_mut()
708 .insert(trigger_instrument_id, matching_core);
709
710 log::info!("Emulating {}", order);
711 }
712
713 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
714 self.check_monitoring(command.strategy_id, command.position_id);
715
716 for order in &command.order_list.orders {
717 if let Some(parent_order_id) = order.parent_order_id() {
718 let cache = self.cache.borrow();
719 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
720 parent_order
721 } else {
722 log::error!("Parent order for {} not found", order.client_order_id());
723 continue;
724 };
725
726 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
727 continue; }
729 }
730
731 if let Err(e) = self.manager.borrow_mut().create_new_submit_order(
732 order.clone(),
733 command.position_id,
734 Some(command.client_id),
735 ) {
736 log::error!("Error creating new submit order: {}", e);
737 }
738 }
739 }
740
741 fn handle_modify_order(&mut self, command: ModifyOrder) {
742 let cache = self.cache.borrow();
743 let order = if let Some(order) = cache.order(&command.client_order_id) {
744 order
745 } else {
746 log::error!("Cannot modify order: {} not found", command.client_order_id);
747 return;
748 };
749
750 let price = match command.price {
751 Some(price) => Some(price),
752 None => order.price(),
753 };
754
755 let trigger_price = match command.trigger_price {
756 Some(trigger_price) => Some(trigger_price),
757 None => order.trigger_price(),
758 };
759
760 let ts_now = self.clock.borrow().timestamp_ns();
762 let event = OrderUpdated::new(
763 order.trader_id(),
764 order.strategy_id(),
765 order.instrument_id(),
766 order.client_order_id(),
767 command.quantity.unwrap_or(order.quantity()),
768 UUID4::new(),
769 ts_now,
770 ts_now,
771 false,
772 order.venue_order_id(),
773 order.account_id(),
774 price,
775 trigger_price,
776 );
777
778 self.manager
779 .borrow()
780 .send_exec_event(OrderEventAny::Updated(event));
781
782 let trigger_instrument_id = order
783 .trigger_instrument_id()
784 .unwrap_or_else(|| order.instrument_id());
785
786 let borrowed_matching_cores = self.matching_cores.borrow();
787 let matching_core = if let Some(core) = borrowed_matching_cores.get(&trigger_instrument_id)
788 {
789 core
790 } else {
791 log::error!(
792 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {}",
793 trigger_instrument_id
794 );
795 return;
796 };
797
798 matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
799
800 }
807
808 fn handle_cancel_order(&mut self, command: CancelOrder) {
809 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
810 order.clone()
811 } else {
812 log::error!("Cannot cancel order: {} not found", command.client_order_id);
813 return;
814 };
815
816 let trigger_instrument_id = order
817 .trigger_instrument_id()
818 .unwrap_or_else(|| order.instrument_id());
819
820 let borrowed_matching_cores = self.matching_cores.borrow();
821 let matching_core = if let Some(core) = borrowed_matching_cores.get(&trigger_instrument_id)
822 {
823 core
824 } else {
825 drop(borrowed_matching_cores);
826 self.manager_cancel_order(order);
827 return;
828 };
829
830 if !matching_core.order_exists(order.client_order_id())
831 && order.is_open()
832 && !order.is_pending_cancel()
833 {
834 self.manager
836 .borrow()
837 .send_exec_command(TradingCommand::CancelOrder(command));
838 } else {
839 drop(borrowed_matching_cores);
840 self.manager_cancel_order(order);
841 }
842 }
843
844 fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
845 let borrowed_matching_cores = self.matching_cores.borrow();
846 let matching_core = match borrowed_matching_cores.get(&command.instrument_id) {
847 Some(core) => core,
848 None => return, };
850
851 let orders_to_cancel = match command.order_side {
852 OrderSide::NoOrderSide => {
853 let mut all_orders = Vec::new();
855 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
856 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
857 all_orders
858 }
859 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
860 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
861 };
862
863 drop(borrowed_matching_cores);
864
865 for order in orders_to_cancel {
867 let order: OrderAny = order.into();
868 self.manager_cancel_order(order);
869 }
870 }
871
872 fn manager_cancel_order(&mut self, order: OrderAny) {
874 if self
875 .cache
876 .borrow()
877 .is_order_pending_cancel_local(&order.client_order_id())
878 {
879 return;
880 }
881
882 if order.is_closed() {
883 log::warn!("Cannot cancel order: already closed");
884 return;
885 }
886
887 self.manager
888 .borrow_mut()
889 .pop_submit_order_command(order.client_order_id());
890
891 self.cancel_order(&order);
893 }
894
895 fn cancel_order(&mut self, order: &OrderAny) {
896 log::info!("Canceling order {}", order.client_order_id());
897
898 let mut order = order.clone();
899 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
900
901 let trigger_instrument_id = order
902 .trigger_instrument_id()
903 .unwrap_or(order.instrument_id());
904
905 if let Some(matching_core) = self
906 .matching_cores
907 .borrow_mut()
908 .get_mut(&trigger_instrument_id)
909 {
910 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
911 log::error!("Cannot delete order: {:?}", e);
912 }
913 }
914
915 self.cache
916 .borrow_mut()
917 .update_order_pending_cancel_local(&order);
918
919 let ts_now = self.clock.borrow().timestamp_ns();
921 let event = OrderCanceled::new(
922 order.trader_id(),
923 order.strategy_id(),
924 order.instrument_id(),
925 order.client_order_id(),
926 UUID4::new(),
927 ts_now,
928 ts_now,
929 false,
930 order.venue_order_id(),
931 order.account_id(),
932 );
933 self.manager
934 .borrow()
935 .send_exec_event(OrderEventAny::Canceled(event));
936 }
937
938 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
939 if !self.subscribed_strategies.contains(&strategy_id) {
940 let handler = {
941 let manager = self.manager.clone();
942 let cache = self.cache.clone();
943 let matching_cores = self.matching_cores.clone();
944 ShareableMessageHandler(Rc::new(OrderEmulatorEventHandler {
945 id: Ustr::from(&UUID4::new().to_string()),
946 callback: Box::new(move |event: &OrderEventAny| {
947 Self::on_event(
948 event.clone(),
949 manager.clone(),
950 cache.clone(),
951 matching_cores.clone(),
952 );
953 }),
954 }))
955 };
956
957 self.msgbus.borrow_mut().subscribe(
959 format!("events.order.{strategy_id}"),
960 handler.clone(),
961 None,
962 );
963 self.msgbus.borrow_mut().subscribe(
964 format!("events.position.{strategy_id}"),
965 handler,
966 None,
967 );
968
969 self.subscribed_strategies.insert(strategy_id);
970 log::info!(
971 "Subscribed to strategy {} order and position events",
972 strategy_id
973 );
974 }
975
976 if let Some(position_id) = position_id {
977 if !self.monitored_positions.contains(&position_id) {
978 self.monitored_positions.insert(position_id);
979 }
980 }
981 }
982
983 fn on_event(
984 event: OrderEventAny,
985 manager: Rc<RefCell<OrderManager>>,
986 cache: Rc<RefCell<Cache>>,
987 matching_cores: Rc<RefCell<HashMap<InstrumentId, OrderMatchingCore>>>,
988 ) {
989 log::info!("{RECV}{EVT} {event}");
990
991 manager.borrow_mut().handle_event(event.clone());
992
993 if let Some(order) = cache.borrow().order(&event.client_order_id()) {
994 if order.is_closed() {
995 if let Some(matching_core) =
996 matching_cores.borrow_mut().get_mut(&order.instrument_id())
997 {
998 if let Err(e) =
999 matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
1000 {
1001 log::error!("Error deleting order: {}", e);
1002 }
1003 }
1004 }
1005 }
1006 }
1008
1009 fn create_matching_core(
1010 &mut self,
1011 instrument_id: InstrumentId,
1012 price_increment: Price,
1013 ) -> OrderMatchingCore {
1014 let matching_core =
1015 OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
1016 self.matching_cores
1017 .borrow_mut()
1018 .insert(instrument_id, matching_core.clone());
1019 log::info!("Creating matching core for {:?}", instrument_id);
1020 matching_core
1021 }
1022
1023 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
1024 match order.order_type() {
1025 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
1026 self.fill_limit_order(order);
1027 }
1028 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
1029 self.fill_market_order(order);
1030 }
1031 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
1032 }
1033 }
1034
1035 fn fill_limit_order(&mut self, order: &mut OrderAny) {
1036 if matches!(order.order_type(), OrderType::Limit) {
1037 self.fill_market_order(order);
1038 return;
1039 }
1040
1041 let mut command = match self
1043 .manager
1044 .borrow_mut()
1045 .pop_submit_order_command(order.client_order_id())
1046 {
1047 Some(command) => command,
1048 None => return, };
1050
1051 let trigger_instrument_id = order
1052 .trigger_instrument_id()
1053 .unwrap_or(order.instrument_id());
1054
1055 let mut matching_core = self
1056 .matching_cores
1057 .borrow()
1058 .get(&trigger_instrument_id)
1059 .cloned();
1060 if let Some(ref mut matching_core) = matching_core {
1061 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
1062 log::error!("Cannot delete order: {:?}", e);
1063 } else {
1064 self.matching_cores
1066 .borrow_mut()
1067 .insert(trigger_instrument_id, matching_core.clone());
1068 }
1069 }
1070
1071 let emulation_trigger = TriggerType::NoTrigger;
1072
1073 let mut transformed = if let Ok(transformed) = LimitOrder::new(
1075 order.trader_id(),
1076 order.strategy_id(),
1077 order.instrument_id(),
1078 order.client_order_id(),
1079 order.order_side(),
1080 order.quantity(),
1081 order.price().unwrap(),
1082 order.time_in_force(),
1083 order.expire_time(),
1084 order.is_post_only(),
1085 order.is_reduce_only(),
1086 order.is_quote_quantity(),
1087 order.display_qty(),
1088 Some(emulation_trigger),
1089 Some(trigger_instrument_id),
1090 order.contingency_type(),
1091 order.order_list_id(),
1092 order.linked_order_ids(),
1093 order.parent_order_id(),
1094 order.exec_algorithm_id(),
1095 order.exec_algorithm_params(),
1096 order.exec_spawn_id(),
1097 order.tags(),
1098 UUID4::new(),
1099 self.clock.borrow().timestamp_ns(),
1100 ) {
1101 transformed
1102 } else {
1103 log::error!("Cannot create limit order");
1104 return;
1105 };
1106
1107 transformed.liquidity_side = order.liquidity_side();
1108 let original_events = order.events();
1115
1116 for event in original_events {
1117 transformed.events.insert(0, event.clone());
1118 }
1119
1120 if let Err(e) = self.cache.borrow_mut().add_order(
1121 OrderAny::Limit(transformed.clone()),
1122 command.position_id,
1123 Some(command.client_id),
1124 true,
1125 ) {
1126 log::error!("Failed to add order: {}", e);
1127 }
1128
1129 command.order = OrderAny::Limit(transformed.clone());
1131
1132 self.msgbus.borrow().publish(
1133 &format!("events.order.{}", order.strategy_id()).into(),
1134 transformed.last_event(),
1135 );
1136
1137 let released_price = match order.order_side() {
1140 OrderSide::Buy => matching_core.unwrap().ask,
1141 OrderSide::Sell => matching_core.unwrap().bid,
1142 _ => panic!("invalid `OrderSide`"),
1143 };
1144
1145 let event = OrderReleased::new(
1147 order.trader_id(),
1148 order.strategy_id(),
1149 order.instrument_id(),
1150 order.client_order_id(),
1151 released_price.unwrap(),
1152 UUID4::new(),
1153 self.clock.borrow().timestamp_ns(),
1154 self.clock.borrow().timestamp_ns(),
1155 );
1156
1157 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1158 log::error!("Failed to apply order event: {}", e);
1159 }
1160 if let Err(e) = self
1161 .cache
1162 .borrow_mut()
1163 .update_order(&OrderAny::Limit(transformed.clone()))
1164 {
1165 log::error!("Failed to update order: {}", e);
1166 }
1167
1168 self.manager
1169 .borrow()
1170 .send_risk_event(OrderEventAny::Released(event));
1171
1172 log::info!("Releasing order {}", order.client_order_id());
1173
1174 self.msgbus.borrow().publish(
1176 &format!("events.order.{}", transformed.strategy_id()).into(),
1177 &OrderEventAny::Released(event),
1178 );
1179
1180 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1181 self.manager
1182 .borrow()
1183 .send_algo_command(command, exec_algorithm_id);
1184 } else {
1185 self.manager
1186 .borrow()
1187 .send_exec_command(TradingCommand::SubmitOrder(command));
1188 }
1189 }
1190
1191 fn fill_market_order(&mut self, order: &mut OrderAny) {
1192 let mut command = match self
1194 .manager
1195 .borrow_mut()
1196 .pop_submit_order_command(order.client_order_id())
1197 {
1198 Some(command) => command,
1199 None => panic!("invalid operation `_fill_market_order` with no command"),
1200 };
1201
1202 let trigger_instrument_id = order
1203 .trigger_instrument_id()
1204 .unwrap_or(order.instrument_id());
1205
1206 let mut matching_core = self
1207 .matching_cores
1208 .borrow()
1209 .get(&trigger_instrument_id)
1210 .cloned();
1211 if let Some(ref mut matching_core) = matching_core {
1212 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
1213 log::error!("Cannot delete order: {:?}", e);
1214 } else {
1215 self.matching_cores
1217 .borrow_mut()
1218 .insert(trigger_instrument_id, matching_core.clone());
1219 }
1220 }
1221
1222 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1223
1224 let mut transformed = MarketOrder::new(
1226 order.trader_id(),
1227 order.strategy_id(),
1228 order.instrument_id(),
1229 order.client_order_id(),
1230 order.order_side(),
1231 order.quantity(),
1232 order.time_in_force(),
1233 UUID4::new(),
1234 self.clock.borrow().timestamp_ns(),
1235 order.is_reduce_only(),
1236 order.is_quote_quantity(),
1237 order.contingency_type(),
1238 order.order_list_id(),
1239 order.linked_order_ids(),
1240 order.parent_order_id(),
1241 order.exec_algorithm_id(),
1242 order.exec_algorithm_params(),
1243 order.exec_spawn_id(),
1244 order.tags(),
1245 );
1246
1247 let original_events = order.events();
1248
1249 for event in original_events {
1250 transformed.events.insert(0, event.clone());
1251 }
1252
1253 if let Err(e) = self.cache.borrow_mut().add_order(
1254 OrderAny::Market(transformed.clone()),
1255 command.position_id,
1256 Some(command.client_id),
1257 true,
1258 ) {
1259 log::error!("Failed to add order: {}", e);
1260 }
1261
1262 command.order = OrderAny::Market(transformed.clone());
1264
1265 self.msgbus.borrow().publish(
1266 &format!("events.order.{}", order.strategy_id()).into(),
1267 transformed.last_event(),
1268 );
1269
1270 let released_price = match order.order_side() {
1273 OrderSide::Buy => matching_core.unwrap().ask,
1274 OrderSide::Sell => matching_core.unwrap().bid,
1275 _ => panic!("invalid `OrderSide`"),
1276 };
1277
1278 let ts_now = self.clock.borrow().timestamp_ns();
1280 let event = OrderReleased::new(
1281 order.trader_id(),
1282 order.strategy_id(),
1283 order.instrument_id(),
1284 order.client_order_id(),
1285 released_price.unwrap(),
1286 UUID4::new(),
1287 ts_now,
1288 ts_now,
1289 );
1290
1291 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1292 log::error!("Failed to apply order event: {}", e);
1293 }
1294
1295 if let Err(e) = self
1296 .cache
1297 .borrow_mut()
1298 .update_order(&OrderAny::Market(transformed))
1299 {
1300 log::error!("Failed to update order: {}", e);
1301 }
1302 self.manager
1303 .borrow()
1304 .send_risk_event(OrderEventAny::Released(event));
1305
1306 log::info!("Releasing order {}", order.client_order_id());
1307
1308 self.msgbus.borrow().publish(
1310 &format!("events.order.{}", order.strategy_id()).into(),
1311 &OrderEventAny::Released(event),
1312 );
1313
1314 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1315 self.manager
1316 .borrow()
1317 .send_algo_command(command, exec_algorithm_id);
1318 } else {
1319 self.manager
1320 .borrow()
1321 .send_exec_command(TradingCommand::SubmitOrder(command));
1322 }
1323 }
1324
1325 fn update_trailing_stop_order(&self, matching_core: &OrderMatchingCore, order: &mut OrderAny) {
1326 let mut bid = None;
1327 let mut ask = None;
1328 let mut last = None;
1329
1330 if matching_core.is_bid_initialized {
1331 bid = matching_core.bid;
1332 }
1333 if matching_core.is_ask_initialized {
1334 ask = matching_core.ask;
1335 }
1336 if matching_core.is_last_initialized {
1337 last = matching_core.last;
1338 }
1339
1340 let quote_tick = self
1341 .cache
1342 .borrow()
1343 .quote(&matching_core.instrument_id)
1344 .copied();
1345 let trade_tick = self
1346 .cache
1347 .borrow()
1348 .trade(&matching_core.instrument_id)
1349 .copied();
1350
1351 if bid.is_none() && quote_tick.is_some() {
1352 bid = Some(quote_tick.unwrap().bid_price);
1353 }
1354 if ask.is_none() && quote_tick.is_some() {
1355 ask = Some(quote_tick.unwrap().ask_price);
1356 }
1357 if last.is_none() && trade_tick.is_some() {
1358 last = Some(trade_tick.unwrap().price);
1359 }
1360
1361 let (new_trigger_price, new_price) = if let Ok((new_trigger_price, new_price)) =
1362 trailing_stop_calculate(matching_core.price_increment, order, bid, ask, last)
1363 {
1364 (new_trigger_price, new_price)
1365 } else {
1366 log::warn!("Cannot calculate trailing stop order");
1367 return;
1368 };
1369
1370 let (new_trigger_price, new_price) = match (new_trigger_price, new_price) {
1371 (None, None) => return, _ => (new_trigger_price, new_price),
1373 };
1374
1375 let ts_now = self.clock.borrow().timestamp_ns();
1377 let event = OrderUpdated::new(
1378 order.trader_id(),
1379 order.strategy_id(),
1380 order.instrument_id(),
1381 order.client_order_id(),
1382 order.quantity(),
1383 UUID4::new(),
1384 ts_now,
1385 ts_now,
1386 false,
1387 order.venue_order_id(),
1388 order.account_id(),
1389 new_price,
1390 new_trigger_price,
1391 );
1392
1393 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
1394 log::error!("Failed to apply order event: {}", e);
1395 }
1396 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1397 log::error!("Failed to update order: {}", e);
1398 }
1399
1400 self.manager
1401 .borrow()
1402 .send_risk_event(OrderEventAny::Updated(event));
1403 }
1404}