1use std::{
17 cell::RefCell,
18 collections::{HashMap, HashSet},
19 fmt::Debug,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 clock::Clock,
26 logging::{CMD, EVT, RECV},
27 messages::execution::{
28 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
29 },
30 msgbus::{self, handler::ShareableMessageHandler},
31};
32use nautilus_core::UUID4;
33use nautilus_model::{
34 data::{OrderBookDeltas, QuoteTick, TradeTick},
35 enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
36 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
37 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
38 instruments::Instrument,
39 orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
40 types::{Price, Quantity},
41};
42
43use crate::{
44 matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
45 trailing::trailing_stop_calculate,
46};
47
48pub struct OrderEmulator {
49 clock: Rc<RefCell<dyn Clock>>,
50 cache: Rc<RefCell<Cache>>,
51 manager: OrderManager,
52 matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
53 subscribed_quotes: HashSet<InstrumentId>,
54 subscribed_trades: HashSet<InstrumentId>,
55 subscribed_strategies: HashSet<StrategyId>,
56 monitored_positions: HashSet<PositionId>,
57 on_event_handler: Option<ShareableMessageHandler>,
58}
59
60impl Debug for OrderEmulator {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct(stringify!(OrderEmulator))
63 .field("cores", &self.matching_cores.len())
64 .field("subscribed_quotes", &self.subscribed_quotes.len())
65 .finish()
66 }
67}
68
69impl OrderEmulator {
70 pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
71 let active_local = true;
75 let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
76
77 Self {
78 clock,
79 cache,
80 manager,
81 matching_cores: HashMap::new(),
82 subscribed_quotes: HashSet::new(),
83 subscribed_trades: HashSet::new(),
84 subscribed_strategies: HashSet::new(),
85 monitored_positions: HashSet::new(),
86 on_event_handler: None,
87 }
88 }
89
90 pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
91 self.on_event_handler = Some(handler);
92 }
93
94 #[must_use]
108 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
109 let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
110 quotes.sort();
111 quotes
112 }
113
114 #[must_use]
115 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
116 let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
117 trades.sort();
118 trades
119 }
120
121 #[must_use]
122 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
123 self.manager.get_submit_order_commands()
124 }
125
126 #[must_use]
127 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
128 self.matching_cores.get(instrument_id).cloned()
129 }
130
131 pub fn on_start(&mut self) -> anyhow::Result<()> {
141 let emulated_orders: Vec<OrderAny> = self
142 .cache
143 .borrow()
144 .orders_emulated(None, None, None, None)
145 .into_iter()
146 .cloned()
147 .collect();
148
149 if emulated_orders.is_empty() {
150 log::error!("No emulated orders to reactivate");
151 return Ok(());
152 }
153
154 for order in emulated_orders {
155 if !matches!(
156 order.status(),
157 OrderStatus::Initialized | OrderStatus::Emulated
158 ) {
159 continue; }
161
162 if let Some(parent_order_id) = &order.parent_order_id() {
163 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
164 order.clone()
165 } else {
166 log::error!("Cannot handle order: parent {parent_order_id} not found");
167 continue;
168 };
169
170 let is_position_closed = parent_order
171 .position_id()
172 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
173 if parent_order.is_closed() && is_position_closed {
174 self.manager.cancel_order(&order);
175 continue; }
177
178 if parent_order.contingency_type() == Some(ContingencyType::Oto)
179 && (parent_order.is_active_local()
180 || parent_order.filled_qty() == Quantity::zero(0))
181 {
182 continue; }
184 }
185
186 let position_id = self
187 .cache
188 .borrow()
189 .position_id(&order.client_order_id())
190 .copied();
191 let client_id = self
192 .cache
193 .borrow()
194 .client_id(&order.client_order_id())
195 .copied();
196
197 let command = SubmitOrder::new(
198 order.trader_id(),
199 client_id.unwrap(),
200 order.strategy_id(),
201 order.instrument_id(),
202 order.client_order_id(),
203 order.venue_order_id().unwrap(),
204 order.clone(),
205 order.exec_algorithm_id(),
206 position_id,
207 UUID4::new(),
208 self.clock.borrow().timestamp_ns(),
209 )?;
210
211 self.handle_submit_order(command);
212 }
213
214 Ok(())
215 }
216
217 pub fn on_event(&mut self, event: OrderEventAny) {
218 log::info!("{RECV}{EVT} {event}");
219
220 self.manager.handle_event(event.clone());
221
222 if let Some(order) = self.cache.borrow().order(&event.client_order_id())
223 && order.is_closed()
224 && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
225 && let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
226 {
227 log::error!("Error deleting order: {e}");
228 }
229 }
231
232 pub const fn on_stop(&self) {}
233
234 pub fn on_reset(&mut self) {
235 self.manager.reset();
236 self.matching_cores.clear();
237 }
238
239 pub const fn on_dispose(&self) {}
240
241 pub fn execute(&mut self, command: TradingCommand) {
242 log::info!("{RECV}{CMD} {command}");
243
244 match command {
245 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
246 TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
247 TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
248 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
249 TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
250 _ => log::error!("Cannot handle command: unrecognized {command:?}"),
251 }
252 }
253
254 fn create_matching_core(
255 &mut self,
256 instrument_id: InstrumentId,
257 price_increment: Price,
258 ) -> OrderMatchingCore {
259 let matching_core =
260 OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
261 self.matching_cores
262 .insert(instrument_id, matching_core.clone());
263 log::info!("Creating matching core for {instrument_id:?}");
264 matching_core
265 }
266
267 pub fn handle_submit_order(&mut self, command: SubmitOrder) {
271 let mut order = command.order.clone();
272 let emulation_trigger = order.emulation_trigger();
273
274 assert_ne!(
275 emulation_trigger,
276 Some(TriggerType::NoTrigger),
277 "command.order.emulation_trigger must not be TriggerType::NoTrigger"
278 );
279 assert!(
280 self.manager
281 .get_submit_order_commands()
282 .contains_key(&order.client_order_id()),
283 "command.order.client_order_id must be in submit_order_commands"
284 );
285
286 if !matches!(
287 emulation_trigger,
288 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
289 ) {
290 log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
291 self.manager.cancel_order(&order);
292 return;
293 }
294
295 self.check_monitoring(command.strategy_id, command.position_id);
296
297 let trigger_instrument_id = order
299 .trigger_instrument_id()
300 .unwrap_or_else(|| order.instrument_id());
301
302 let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
303
304 let mut matching_core = if let Some(core) = matching_core {
305 core
306 } else {
307 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
309 let synthetic = self
310 .cache
311 .borrow()
312 .synthetic(&trigger_instrument_id)
313 .cloned();
314 if let Some(synthetic) = synthetic {
315 (synthetic.id, synthetic.price_increment)
316 } else {
317 log::error!(
318 "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
319 );
320 self.manager.cancel_order(&order);
321 return;
322 }
323 } else {
324 let instrument = self
325 .cache
326 .borrow()
327 .instrument(&trigger_instrument_id)
328 .cloned();
329 if let Some(instrument) = instrument {
330 (instrument.id(), instrument.price_increment())
331 } else {
332 log::error!(
333 "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
334 );
335 self.manager.cancel_order(&order);
336 return;
337 }
338 };
339
340 self.create_matching_core(instrument_id, price_increment)
341 };
342
343 if matches!(
345 order.order_type(),
346 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
347 ) {
348 self.update_trailing_stop_order(&mut order);
349 if order.trigger_price().is_none() {
350 log::error!(
351 "Cannot handle trailing stop order with no trigger_price and no market updates"
352 );
353
354 self.manager.cancel_order(&order);
355 return;
356 }
357 }
358
359 self.manager.cache_submit_order_command(command);
361
362 matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
364
365 match emulation_trigger.unwrap() {
367 TriggerType::Default | TriggerType::BidAsk => {
368 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
369 if !trigger_instrument_id.is_synthetic() {
370 }
373 self.subscribed_quotes.insert(trigger_instrument_id);
376 }
377 }
378 TriggerType::LastPrice => {
379 if !self.subscribed_trades.contains(&trigger_instrument_id) {
380 self.subscribed_trades.insert(trigger_instrument_id);
383 }
384 }
385 _ => {
386 log::error!("Invalid TriggerType: {emulation_trigger:?}");
387 return;
388 }
389 }
390
391 if !self
393 .manager
394 .get_submit_order_commands()
395 .contains_key(&order.client_order_id())
396 {
397 return; }
399
400 if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
402 log::error!("Cannot add order: {e:?}");
403 return;
404 }
405
406 if order.status() == OrderStatus::Initialized {
408 let event = OrderEmulated::new(
409 order.trader_id(),
410 order.strategy_id(),
411 order.instrument_id(),
412 order.client_order_id(),
413 UUID4::new(),
414 self.clock.borrow().timestamp_ns(),
415 self.clock.borrow().timestamp_ns(),
416 );
417
418 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
419 log::error!("Cannot apply order event: {e:?}");
420 return;
421 }
422
423 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
424 log::error!("Cannot update order: {e:?}");
425 return;
426 }
427
428 self.manager.send_risk_event(OrderEventAny::Emulated(event));
429
430 msgbus::publish(
431 format!("events.order.{}", order.strategy_id()).into(),
432 &OrderEventAny::Emulated(event),
433 );
434 }
435
436 self.matching_cores
438 .insert(trigger_instrument_id, matching_core);
439
440 log::info!("Emulating {order}");
441 }
442
443 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
444 self.check_monitoring(command.strategy_id, command.position_id);
445
446 for order in &command.order_list.orders {
447 if let Some(parent_order_id) = order.parent_order_id() {
448 let cache = self.cache.borrow();
449 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
450 parent_order
451 } else {
452 log::error!("Parent order for {} not found", order.client_order_id());
453 continue;
454 };
455
456 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
457 continue; }
459 }
460
461 if let Err(e) = self.manager.create_new_submit_order(
462 order,
463 command.position_id,
464 Some(command.client_id),
465 ) {
466 log::error!("Error creating new submit order: {e}");
467 }
468 }
469 }
470
471 fn handle_modify_order(&mut self, command: ModifyOrder) {
472 if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
473 let price = match command.price {
474 Some(price) => Some(price),
475 None => order.price(),
476 };
477
478 let trigger_price = match command.trigger_price {
479 Some(trigger_price) => Some(trigger_price),
480 None => order.trigger_price(),
481 };
482
483 let ts_now = self.clock.borrow().timestamp_ns();
485 let event = OrderUpdated::new(
486 order.trader_id(),
487 order.strategy_id(),
488 order.instrument_id(),
489 order.client_order_id(),
490 command.quantity.unwrap_or(order.quantity()),
491 UUID4::new(),
492 ts_now,
493 ts_now,
494 false,
495 order.venue_order_id(),
496 order.account_id(),
497 price,
498 trigger_price,
499 );
500
501 self.manager.send_exec_event(OrderEventAny::Updated(event));
502
503 let trigger_instrument_id = order
504 .trigger_instrument_id()
505 .unwrap_or_else(|| order.instrument_id());
506
507 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
508 matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
509 } else {
510 log::error!(
511 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
512 );
513 }
514 } else {
515 log::error!("Cannot modify order: {} not found", command.client_order_id);
516 }
517 }
518
519 pub fn handle_cancel_order(&mut self, command: CancelOrder) {
520 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
521 order.clone()
522 } else {
523 log::error!("Cannot cancel order: {} not found", command.client_order_id);
524 return;
525 };
526
527 let trigger_instrument_id = order
528 .trigger_instrument_id()
529 .unwrap_or_else(|| order.instrument_id());
530
531 let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
532 core
533 } else {
534 self.manager.cancel_order(&order);
535 return;
536 };
537
538 if !matching_core.order_exists(order.client_order_id())
539 && order.is_open()
540 && !order.is_pending_cancel()
541 {
542 self.manager
544 .send_exec_command(TradingCommand::CancelOrder(command));
545 } else {
546 self.manager.cancel_order(&order);
547 }
548 }
549
550 fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
551 let instrument_id = command.instrument_id;
552 let matching_core = match self.matching_cores.get(&instrument_id) {
553 Some(core) => core,
554 None => return, };
556
557 let orders_to_cancel = match command.order_side {
558 OrderSide::NoOrderSide => {
559 let mut all_orders = Vec::new();
561 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
562 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
563 all_orders
564 }
565 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
566 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
567 };
568
569 for order in orders_to_cancel {
571 self.manager.cancel_order(&OrderAny::from(order));
572 }
573 }
574
575 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
578 log::info!(
579 "Updating order {} quantity to {new_quantity}",
580 order.client_order_id(),
581 );
582
583 let ts_now = self.clock.borrow().timestamp_ns();
585 let event = OrderUpdated::new(
586 order.trader_id(),
587 order.strategy_id(),
588 order.instrument_id(),
589 order.client_order_id(),
590 new_quantity,
591 UUID4::new(),
592 ts_now,
593 ts_now,
594 false,
595 None,
596 order.account_id(),
597 None,
598 None,
599 );
600
601 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
602 log::error!("Cannot apply order event: {e:?}");
603 return;
604 }
605 if let Err(e) = self.cache.borrow_mut().update_order(order) {
606 log::error!("Cannot update order: {e:?}");
607 return;
608 }
609
610 self.manager.send_risk_event(OrderEventAny::Updated(event));
611 }
612
613 pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
616 log::debug!("Processing {deltas:?}");
617
618 let instrument_id = &deltas.instrument_id;
619 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
620 if let Some(book) = self.cache.borrow().order_book(instrument_id) {
621 let best_bid = book.best_bid_price();
622 let best_ask = book.best_ask_price();
623
624 if let Some(best_bid) = best_bid {
625 matching_core.set_bid_raw(best_bid);
626 }
627
628 if let Some(best_ask) = best_ask {
629 matching_core.set_ask_raw(best_ask);
630 }
631 } else {
632 log::error!(
633 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
634 deltas.instrument_id
635 );
636 }
637
638 self.iterate_orders(instrument_id);
639 } else {
640 log::error!(
641 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
642 deltas.instrument_id
643 );
644 }
645 }
646
647 pub fn on_quote_tick(&mut self, quote: QuoteTick) {
648 log::debug!("Processing {quote}:?");
649
650 let instrument_id = "e.instrument_id;
651 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
652 matching_core.set_bid_raw(quote.bid_price);
653 matching_core.set_ask_raw(quote.ask_price);
654
655 self.iterate_orders(instrument_id);
656 } else {
657 log::error!(
658 "Cannot handle `QuoteTick`: no matching core for instrument {}",
659 quote.instrument_id
660 );
661 }
662 }
663
664 pub fn on_trade_tick(&mut self, trade: TradeTick) {
665 log::debug!("Processing {trade:?}");
666
667 let instrument_id = &trade.instrument_id;
668 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
669 matching_core.set_last_raw(trade.price);
670 if !self.subscribed_quotes.contains(instrument_id) {
671 matching_core.set_bid_raw(trade.price);
672 matching_core.set_ask_raw(trade.price);
673 }
674
675 self.iterate_orders(instrument_id);
676 } else {
677 log::error!(
678 "Cannot handle `TradeTick`: no matching core for instrument {}",
679 trade.instrument_id
680 );
681 }
682 }
683
684 fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
685 let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
686 matching_core.iterate();
687
688 matching_core.get_orders()
689 } else {
690 log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
691 return;
692 };
693
694 for order in orders {
695 if order.is_closed() {
696 continue;
697 }
698
699 let mut order: OrderAny = order.clone().into();
700 if matches!(
701 order.order_type(),
702 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
703 ) {
704 self.update_trailing_stop_order(&mut order);
705 }
706 }
707 }
708
709 pub fn cancel_order(&mut self, order: &OrderAny) {
710 log::info!("Canceling order {}", order.client_order_id());
711
712 let mut order = order.clone();
713 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
714
715 let trigger_instrument_id = order
716 .trigger_instrument_id()
717 .unwrap_or(order.instrument_id());
718
719 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
720 && let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
721 {
722 log::error!("Cannot delete order: {e:?}");
723 }
724
725 self.cache
726 .borrow_mut()
727 .update_order_pending_cancel_local(&order);
728
729 let ts_now = self.clock.borrow().timestamp_ns();
731 let event = OrderCanceled::new(
732 order.trader_id(),
733 order.strategy_id(),
734 order.instrument_id(),
735 order.client_order_id(),
736 UUID4::new(),
737 ts_now,
738 ts_now,
739 false,
740 order.venue_order_id(),
741 order.account_id(),
742 );
743
744 self.manager.send_exec_event(OrderEventAny::Canceled(event));
745 }
746
747 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
748 if !self.subscribed_strategies.contains(&strategy_id) {
749 if let Some(handler) = &self.on_event_handler {
751 msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
752 msgbus::subscribe_str(
753 format!("events.position.{strategy_id}"),
754 handler.clone(),
755 None,
756 );
757 self.subscribed_strategies.insert(strategy_id);
758 log::info!("Subscribed to strategy {strategy_id} order and position events");
759 }
760 }
761
762 if let Some(position_id) = position_id
763 && !self.monitored_positions.contains(&position_id)
764 {
765 self.monitored_positions.insert(position_id);
766 }
767 }
768
769 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
773 match order.order_type() {
774 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
775 self.fill_limit_order(order);
776 }
777 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
778 self.fill_market_order(order);
779 }
780 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
781 }
782 }
783
784 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
788 if matches!(order.order_type(), OrderType::Limit) {
789 self.fill_market_order(order);
790 return;
791 }
792
793 let mut command = match self
795 .manager
796 .pop_submit_order_command(order.client_order_id())
797 {
798 Some(command) => command,
799 None => return, };
801
802 let trigger_instrument_id = order
803 .trigger_instrument_id()
804 .unwrap_or(order.instrument_id());
805
806 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
807 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
808 log::error!("Error deleting order: {e:?}");
809 }
810
811 let emulation_trigger = TriggerType::NoTrigger;
812
813 let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
815 order.trader_id(),
816 order.strategy_id(),
817 order.instrument_id(),
818 order.client_order_id(),
819 order.order_side(),
820 order.quantity(),
821 order.price().unwrap(),
822 order.time_in_force(),
823 order.expire_time(),
824 order.is_post_only(),
825 order.is_reduce_only(),
826 order.is_quote_quantity(),
827 order.display_qty(),
828 Some(emulation_trigger),
829 Some(trigger_instrument_id),
830 order.contingency_type(),
831 order.order_list_id(),
832 order.linked_order_ids().map(Vec::from),
833 order.parent_order_id(),
834 order.exec_algorithm_id(),
835 order.exec_algorithm_params().cloned(),
836 order.exec_spawn_id(),
837 order.tags().map(Vec::from),
838 UUID4::new(),
839 self.clock.borrow().timestamp_ns(),
840 ) {
841 transformed
842 } else {
843 log::error!("Cannot create limit order");
844 return;
845 };
846 transformed.liquidity_side = order.liquidity_side();
847
848 let original_events = order.events();
855
856 for event in original_events {
857 transformed.events.insert(0, event.clone());
858 }
859
860 if let Err(e) = self.cache.borrow_mut().add_order(
861 OrderAny::Limit(transformed.clone()),
862 command.position_id,
863 Some(command.client_id),
864 true,
865 ) {
866 log::error!("Failed to add order: {e}");
867 }
868
869 command.order = OrderAny::Limit(transformed.clone());
871
872 msgbus::publish(
873 format!("events.order.{}", order.strategy_id()).into(),
874 transformed.last_event(),
875 );
876
877 let released_price = match order.order_side() {
879 OrderSide::Buy => matching_core.ask,
880 OrderSide::Sell => matching_core.bid,
881 _ => panic!("invalid `OrderSide`"),
882 };
883
884 let event = OrderReleased::new(
886 order.trader_id(),
887 order.strategy_id(),
888 order.instrument_id(),
889 order.client_order_id(),
890 released_price.unwrap(),
891 UUID4::new(),
892 self.clock.borrow().timestamp_ns(),
893 self.clock.borrow().timestamp_ns(),
894 );
895
896 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
897 log::error!("Failed to apply order event: {e}");
898 }
899
900 if let Err(e) = self
901 .cache
902 .borrow_mut()
903 .update_order(&OrderAny::Limit(transformed.clone()))
904 {
905 log::error!("Failed to update order: {e}");
906 }
907
908 self.manager.send_risk_event(OrderEventAny::Released(event));
909
910 log::info!("Releasing order {}", order.client_order_id());
911
912 msgbus::publish(
914 format!("events.order.{}", transformed.strategy_id()).into(),
915 &OrderEventAny::Released(event),
916 );
917
918 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
919 self.manager.send_algo_command(command, exec_algorithm_id);
920 } else {
921 self.manager
922 .send_exec_command(TradingCommand::SubmitOrder(command));
923 }
924 } else {
925 log::error!(
926 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
927 );
928 }
929 }
930
931 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
935 let mut command = match self
937 .manager
938 .pop_submit_order_command(order.client_order_id())
939 {
940 Some(command) => command,
941 None => panic!("invalid operation `_fill_market_order` with no command"),
942 };
943
944 let trigger_instrument_id = order
945 .trigger_instrument_id()
946 .unwrap_or(order.instrument_id());
947
948 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
949 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
950 log::error!("Cannot delete order: {e:?}");
951 }
952
953 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
954
955 let mut transformed = MarketOrder::new(
957 order.trader_id(),
958 order.strategy_id(),
959 order.instrument_id(),
960 order.client_order_id(),
961 order.order_side(),
962 order.quantity(),
963 order.time_in_force(),
964 UUID4::new(),
965 self.clock.borrow().timestamp_ns(),
966 order.is_reduce_only(),
967 order.is_quote_quantity(),
968 order.contingency_type(),
969 order.order_list_id(),
970 order.linked_order_ids().map(Vec::from),
971 order.parent_order_id(),
972 order.exec_algorithm_id(),
973 order.exec_algorithm_params().cloned(),
974 order.exec_spawn_id(),
975 order.tags().map(Vec::from),
976 );
977
978 let original_events = order.events();
979
980 for event in original_events {
981 transformed.events.insert(0, event.clone());
982 }
983
984 if let Err(e) = self.cache.borrow_mut().add_order(
985 OrderAny::Market(transformed.clone()),
986 command.position_id,
987 Some(command.client_id),
988 true,
989 ) {
990 log::error!("Failed to add order: {e}");
991 }
992
993 command.order = OrderAny::Market(transformed.clone());
995
996 msgbus::publish(
997 format!("events.order.{}", order.strategy_id()).into(),
998 transformed.last_event(),
999 );
1000
1001 let released_price = match order.order_side() {
1004 OrderSide::Buy => matching_core.ask,
1005 OrderSide::Sell => matching_core.bid,
1006 _ => panic!("invalid `OrderSide`"),
1007 };
1008
1009 let ts_now = self.clock.borrow().timestamp_ns();
1011 let event = OrderReleased::new(
1012 order.trader_id(),
1013 order.strategy_id(),
1014 order.instrument_id(),
1015 order.client_order_id(),
1016 released_price.unwrap(),
1017 UUID4::new(),
1018 ts_now,
1019 ts_now,
1020 );
1021
1022 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1023 log::error!("Failed to apply order event: {e}");
1024 }
1025
1026 if let Err(e) = self
1027 .cache
1028 .borrow_mut()
1029 .update_order(&OrderAny::Market(transformed))
1030 {
1031 log::error!("Failed to update order: {e}");
1032 }
1033 self.manager.send_risk_event(OrderEventAny::Released(event));
1034
1035 log::info!("Releasing order {}", order.client_order_id());
1036
1037 msgbus::publish(
1039 format!("events.order.{}", order.strategy_id()).into(),
1040 &OrderEventAny::Released(event),
1041 );
1042
1043 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1044 self.manager.send_algo_command(command, exec_algorithm_id);
1045 } else {
1046 self.manager
1047 .send_exec_command(TradingCommand::SubmitOrder(command));
1048 }
1049 } else {
1050 log::error!(
1051 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
1052 );
1053 }
1054 }
1055
1056 #[allow(clippy::too_many_lines)]
1057 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1058 let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1059 log::error!(
1060 "Cannot update trailing-stop order: no matching core for instrument {}",
1061 order.instrument_id()
1062 );
1063 return;
1064 };
1065
1066 let mut bid = matching_core.bid;
1067 let mut ask = matching_core.ask;
1068 let mut last = matching_core.last;
1069
1070 if bid.is_none() || ask.is_none() || last.is_none() {
1071 if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1072 bid.get_or_insert(q.bid_price);
1073 ask.get_or_insert(q.ask_price);
1074 }
1075 if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1076 last.get_or_insert(t.price);
1077 }
1078 }
1079
1080 let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1081 matching_core.price_increment,
1082 order.trigger_price(),
1083 order.activation_price(),
1084 order,
1085 bid,
1086 ask,
1087 last,
1088 ) {
1089 Ok(pair) => pair,
1090 Err(e) => {
1091 log::warn!("Cannot calculate trailing-stop update: {e}");
1092 return;
1093 }
1094 };
1095
1096 if new_trigger_px.is_none() && new_limit_px.is_none() {
1097 return;
1098 }
1099
1100 let ts_now = self.clock.borrow().timestamp_ns();
1101 let update = OrderUpdated::new(
1102 order.trader_id(),
1103 order.strategy_id(),
1104 order.instrument_id(),
1105 order.client_order_id(),
1106 order.quantity(),
1107 UUID4::new(),
1108 ts_now,
1109 ts_now,
1110 false,
1111 order.venue_order_id(),
1112 order.account_id(),
1113 new_limit_px,
1114 new_trigger_px,
1115 );
1116 let wrapped = OrderEventAny::Updated(update);
1117 if let Err(e) = order.apply(wrapped.clone()) {
1118 log::error!("Failed to apply order event: {e}");
1119 return;
1120 }
1121 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1122 log::error!("Failed to update order in cache: {e}");
1123 return;
1124 }
1125 self.manager.send_risk_event(wrapped);
1126 }
1127}