1use std::{
17 cell::RefCell,
18 collections::{HashMap, HashSet},
19 rc::Rc,
20};
21
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 logging::{CMD, EVT, RECV},
26 msgbus::{
27 handler::ShareableMessageHandler,
28 {self},
29 },
30};
31use nautilus_core::uuid::UUID4;
32use nautilus_model::{
33 data::{OrderBookDeltas, QuoteTick, TradeTick},
34 enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
35 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
36 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
37 instruments::Instrument,
38 orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
39 types::{Price, Quantity},
40};
41
42use crate::{
43 matching_core::OrderMatchingCore,
44 messages::{
45 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
46 cancel::CancelOrderHandlerAny, modify::ModifyOrderHandlerAny,
47 submit::SubmitOrderHandlerAny,
48 },
49 order_manager::manager::OrderManager,
50 trailing::trailing_stop_calculate,
51};
52
53pub struct OrderEmulator {
54 clock: Rc<RefCell<dyn Clock>>,
55 cache: Rc<RefCell<Cache>>,
56 manager: OrderManager,
57 matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
58 subscribed_quotes: HashSet<InstrumentId>,
59 subscribed_trades: HashSet<InstrumentId>,
60 subscribed_strategies: HashSet<StrategyId>,
61 monitored_positions: HashSet<PositionId>,
62 on_event_handler: Option<ShareableMessageHandler>,
63}
64
65impl OrderEmulator {
66 pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
67 let active_local = true;
71 let manager =
72 OrderManager::new(clock.clone(), cache.clone(), active_local, None, None, None);
73
74 Self {
75 clock,
76 cache,
77 manager,
78 matching_cores: HashMap::new(),
79 subscribed_quotes: HashSet::new(),
80 subscribed_trades: HashSet::new(),
81 subscribed_strategies: HashSet::new(),
82 monitored_positions: HashSet::new(),
83 on_event_handler: None,
84 }
85 }
86
87 pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
88 self.on_event_handler = Some(handler);
89 }
90
91 pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
92 self.manager.set_submit_order_handler(handler);
93 }
94
95 pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
96 self.manager.set_cancel_order_handler(handler);
97 }
98
99 pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
100 self.manager.set_modify_order_handler(handler);
101 }
102
103 #[must_use]
104 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
105 let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
106 quotes.sort();
107 quotes
108 }
109
110 #[must_use]
111 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
112 let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
113 trades.sort();
114 trades
115 }
116
117 #[must_use]
118 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
119 self.manager.get_submit_order_commands()
120 }
121
122 #[must_use]
123 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
124 self.matching_cores.get(instrument_id).cloned()
125 }
126
127 pub fn on_start(&mut self) -> anyhow::Result<()> {
128 let emulated_orders: Vec<OrderAny> = self
129 .cache
130 .borrow()
131 .orders_emulated(None, None, None, None)
132 .into_iter()
133 .cloned()
134 .collect();
135
136 if emulated_orders.is_empty() {
137 log::error!("No emulated orders to reactivate");
138 return Ok(());
139 }
140
141 for order in emulated_orders {
142 if !matches!(
143 order.status(),
144 OrderStatus::Initialized | OrderStatus::Emulated
145 ) {
146 continue; }
148
149 if let Some(parent_order_id) = &order.parent_order_id() {
150 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
151 order.clone()
152 } else {
153 log::error!("Cannot handle order: parent {parent_order_id} not found");
154 continue;
155 };
156
157 let is_position_closed = parent_order
158 .position_id()
159 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
160 if parent_order.is_closed() && is_position_closed {
161 self.manager.cancel_order(&order);
162 continue; }
164
165 if parent_order.contingency_type() == Some(ContingencyType::Oto)
166 && (parent_order.is_active_local()
167 || parent_order.filled_qty() == Quantity::zero(0))
168 {
169 continue; }
171 }
172
173 let position_id = self
174 .cache
175 .borrow()
176 .position_id(&order.client_order_id())
177 .copied();
178 let client_id = self
179 .cache
180 .borrow()
181 .client_id(&order.client_order_id())
182 .copied();
183
184 let command = SubmitOrder::new(
185 order.trader_id(),
186 client_id.unwrap(),
187 order.strategy_id(),
188 order.instrument_id(),
189 order.client_order_id(),
190 order.venue_order_id().unwrap(),
191 order.clone(),
192 order.exec_algorithm_id(),
193 position_id,
194 UUID4::new(),
195 self.clock.borrow().timestamp_ns(),
196 )?;
197
198 self.handle_submit_order(command);
199 }
200
201 Ok(())
202 }
203
204 pub fn on_event(&mut self, event: OrderEventAny) {
205 log::info!("{RECV}{EVT} {event}");
206
207 self.manager.handle_event(event.clone());
208
209 if let Some(order) = self.cache.borrow().order(&event.client_order_id()) {
210 if order.is_closed() {
211 if let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id()) {
212 if let Err(e) =
213 matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
214 {
215 log::error!("Error deleting order: {e}");
216 }
217 }
218 }
219 }
220 }
222
223 pub const fn on_stop(&self) {}
224
225 pub fn on_reset(&mut self) {
226 self.manager.reset();
227 self.matching_cores.clear();
228 }
229
230 pub const fn on_dispose(&self) {}
231
232 pub fn execute(&mut self, command: TradingCommand) {
233 log::info!("{RECV}{CMD} {command}");
234
235 match command {
236 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
237 TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
238 TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
239 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
240 TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
241 _ => log::error!("Cannot handle command: unrecognized {command:?}"),
242 }
243 }
244
245 fn create_matching_core(
246 &mut self,
247 instrument_id: InstrumentId,
248 price_increment: Price,
249 ) -> OrderMatchingCore {
250 let matching_core =
251 OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
252 self.matching_cores
253 .insert(instrument_id, matching_core.clone());
254 log::info!("Creating matching core for {instrument_id:?}");
255 matching_core
256 }
257
258 pub fn handle_submit_order(&mut self, command: SubmitOrder) {
259 let mut order = command.order.clone();
260 let emulation_trigger = order.emulation_trigger();
261
262 assert_ne!(
263 emulation_trigger,
264 Some(TriggerType::NoTrigger),
265 "command.order.emulation_trigger must not be TriggerType::NoTrigger"
266 );
267 assert!(
268 self.manager
269 .get_submit_order_commands()
270 .contains_key(&order.client_order_id()),
271 "command.order.client_order_id must be in submit_order_commands"
272 );
273
274 if !matches!(
275 emulation_trigger,
276 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
277 ) {
278 log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
279 self.manager.cancel_order(&order);
280 return;
281 }
282
283 self.check_monitoring(command.strategy_id, command.position_id);
284
285 let trigger_instrument_id = order
287 .trigger_instrument_id()
288 .unwrap_or_else(|| order.instrument_id());
289
290 let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
291
292 let mut matching_core = if let Some(core) = matching_core {
293 core
294 } else {
295 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
297 let synthetic = self
298 .cache
299 .borrow()
300 .synthetic(&trigger_instrument_id)
301 .cloned();
302 if let Some(synthetic) = synthetic {
303 (synthetic.id, synthetic.price_increment)
304 } else {
305 log::error!(
306 "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
307 );
308 self.manager.cancel_order(&order);
309 return;
310 }
311 } else {
312 let instrument = self
313 .cache
314 .borrow()
315 .instrument(&trigger_instrument_id)
316 .cloned();
317 if let Some(instrument) = instrument {
318 (instrument.id(), instrument.price_increment())
319 } else {
320 log::error!(
321 "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
322 );
323 self.manager.cancel_order(&order);
324 return;
325 }
326 };
327
328 self.create_matching_core(instrument_id, price_increment)
329 };
330
331 if matches!(
333 order.order_type(),
334 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
335 ) {
336 self.update_trailing_stop_order(&mut order);
337 if order.trigger_price().is_none() {
338 log::error!(
339 "Cannot handle trailing stop order with no trigger_price and no market updates"
340 );
341
342 self.manager.cancel_order(&order);
343 return;
344 }
345 }
346
347 self.manager.cache_submit_order_command(command);
349
350 matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
352
353 match emulation_trigger.unwrap() {
355 TriggerType::Default | TriggerType::BidAsk => {
356 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
357 if !trigger_instrument_id.is_synthetic() {
358 }
361 self.subscribed_quotes.insert(trigger_instrument_id);
364 }
365 }
366 TriggerType::LastPrice => {
367 if !self.subscribed_trades.contains(&trigger_instrument_id) {
368 self.subscribed_trades.insert(trigger_instrument_id);
371 }
372 }
373 _ => {
374 log::error!("Invalid TriggerType: {emulation_trigger:?}");
375 return;
376 }
377 }
378
379 if !self
381 .manager
382 .get_submit_order_commands()
383 .contains_key(&order.client_order_id())
384 {
385 return; }
387
388 if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
390 log::error!("Cannot add order: {e:?}");
391 return;
392 }
393
394 if order.status() == OrderStatus::Initialized {
396 let event = OrderEmulated::new(
397 order.trader_id(),
398 order.strategy_id(),
399 order.instrument_id(),
400 order.client_order_id(),
401 UUID4::new(),
402 self.clock.borrow().timestamp_ns(),
403 self.clock.borrow().timestamp_ns(),
404 );
405
406 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
407 log::error!("Cannot apply order event: {e:?}");
408 return;
409 }
410
411 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
412 log::error!("Cannot update order: {e:?}");
413 return;
414 }
415
416 self.manager.send_risk_event(OrderEventAny::Emulated(event));
417
418 msgbus::publish(
419 &format!("events.order.{}", order.strategy_id()).into(),
420 &OrderEventAny::Emulated(event),
421 );
422 }
423
424 self.matching_cores
426 .insert(trigger_instrument_id, matching_core);
427
428 log::info!("Emulating {order}");
429 }
430
431 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
432 self.check_monitoring(command.strategy_id, command.position_id);
433
434 for order in &command.order_list.orders {
435 if let Some(parent_order_id) = order.parent_order_id() {
436 let cache = self.cache.borrow();
437 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
438 parent_order
439 } else {
440 log::error!("Parent order for {} not found", order.client_order_id());
441 continue;
442 };
443
444 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
445 continue; }
447 }
448
449 if let Err(e) = self.manager.create_new_submit_order(
450 order,
451 command.position_id,
452 Some(command.client_id),
453 ) {
454 log::error!("Error creating new submit order: {e}");
455 }
456 }
457 }
458
459 fn handle_modify_order(&mut self, command: ModifyOrder) {
460 if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
461 let price = match command.price {
462 Some(price) => Some(price),
463 None => order.price(),
464 };
465
466 let trigger_price = match command.trigger_price {
467 Some(trigger_price) => Some(trigger_price),
468 None => order.trigger_price(),
469 };
470
471 let ts_now = self.clock.borrow().timestamp_ns();
473 let event = OrderUpdated::new(
474 order.trader_id(),
475 order.strategy_id(),
476 order.instrument_id(),
477 order.client_order_id(),
478 command.quantity.unwrap_or(order.quantity()),
479 UUID4::new(),
480 ts_now,
481 ts_now,
482 false,
483 order.venue_order_id(),
484 order.account_id(),
485 price,
486 trigger_price,
487 );
488
489 self.manager.send_exec_event(OrderEventAny::Updated(event));
490
491 let trigger_instrument_id = order
492 .trigger_instrument_id()
493 .unwrap_or_else(|| order.instrument_id());
494
495 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
496 matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
497 } else {
498 log::error!(
499 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
500 );
501 }
502 } else {
503 log::error!("Cannot modify order: {} not found", command.client_order_id);
504 }
505 }
506
507 pub fn handle_cancel_order(&mut self, command: CancelOrder) {
508 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
509 order.clone()
510 } else {
511 log::error!("Cannot cancel order: {} not found", command.client_order_id);
512 return;
513 };
514
515 let trigger_instrument_id = order
516 .trigger_instrument_id()
517 .unwrap_or_else(|| order.instrument_id());
518
519 let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
520 core
521 } else {
522 self.manager.cancel_order(&order);
523 return;
524 };
525
526 if !matching_core.order_exists(order.client_order_id())
527 && order.is_open()
528 && !order.is_pending_cancel()
529 {
530 self.manager
532 .send_exec_command(TradingCommand::CancelOrder(command));
533 } else {
534 self.manager.cancel_order(&order);
535 }
536 }
537
538 fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
539 let matching_core = match self.matching_cores.get(&command.instrument_id) {
540 Some(core) => core,
541 None => return, };
543
544 let orders_to_cancel = match command.order_side {
545 OrderSide::NoOrderSide => {
546 let mut all_orders = Vec::new();
548 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
549 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
550 all_orders
551 }
552 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
553 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
554 };
555
556 for order in orders_to_cancel {
558 self.manager.cancel_order(&OrderAny::from(order));
559 }
560 }
561
562 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
565 log::info!(
566 "Updating order {} quantity to {new_quantity}",
567 order.client_order_id(),
568 );
569
570 let ts_now = self.clock.borrow().timestamp_ns();
572 let event = OrderUpdated::new(
573 order.trader_id(),
574 order.strategy_id(),
575 order.instrument_id(),
576 order.client_order_id(),
577 new_quantity,
578 UUID4::new(),
579 ts_now,
580 ts_now,
581 false,
582 None,
583 order.account_id(),
584 None,
585 None,
586 );
587
588 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
589 log::error!("Cannot apply order event: {e:?}");
590 return;
591 }
592 if let Err(e) = self.cache.borrow_mut().update_order(order) {
593 log::error!("Cannot update order: {e:?}");
594 return;
595 }
596
597 self.manager.send_risk_event(OrderEventAny::Updated(event));
598 }
599
600 pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
603 log::debug!("Processing {deltas:?}");
604
605 let instrument_id = &deltas.instrument_id;
606 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
607 if let Some(book) = self.cache.borrow().order_book(instrument_id) {
608 let best_bid = book.best_bid_price();
609 let best_ask = book.best_ask_price();
610
611 if let Some(best_bid) = best_bid {
612 matching_core.set_bid_raw(best_bid);
613 }
614
615 if let Some(best_ask) = best_ask {
616 matching_core.set_ask_raw(best_ask);
617 }
618 } else {
619 log::error!(
620 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
621 deltas.instrument_id
622 );
623 }
624
625 self.iterate_orders(instrument_id);
626 } else {
627 log::error!(
628 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
629 deltas.instrument_id
630 );
631 }
632 }
633
634 pub fn on_quote_tick(&mut self, quote: QuoteTick) {
635 log::debug!("Processing {quote}:?");
636
637 let instrument_id = "e.instrument_id;
638 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
639 matching_core.set_bid_raw(quote.bid_price);
640 matching_core.set_ask_raw(quote.ask_price);
641
642 self.iterate_orders(instrument_id);
643 } else {
644 log::error!(
645 "Cannot handle `QuoteTick`: no matching core for instrument {}",
646 quote.instrument_id
647 );
648 }
649 }
650
651 pub fn on_trade_tick(&mut self, trade: TradeTick) {
652 log::debug!("Processing {trade:?}");
653
654 let instrument_id = &trade.instrument_id;
655 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
656 matching_core.set_last_raw(trade.price);
657 if !self.subscribed_quotes.contains(instrument_id) {
658 matching_core.set_bid_raw(trade.price);
659 matching_core.set_ask_raw(trade.price);
660 }
661
662 self.iterate_orders(instrument_id);
663 } else {
664 log::error!(
665 "Cannot handle `TradeTick`: no matching core for instrument {}",
666 trade.instrument_id
667 );
668 }
669 }
670
671 fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
672 let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
673 matching_core.iterate();
674
675 matching_core.get_orders()
676 } else {
677 log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
678 return;
679 };
680
681 for order in orders {
682 if order.is_closed() {
683 continue;
684 }
685
686 let mut order: OrderAny = order.clone().into();
687 if matches!(
688 order.order_type(),
689 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
690 ) {
691 self.update_trailing_stop_order(&mut order);
692 }
693 }
694 }
695
696 pub fn cancel_order(&mut self, order: &OrderAny) {
697 log::info!("Canceling order {}", order.client_order_id());
698
699 let mut order = order.clone();
700 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
701
702 let trigger_instrument_id = order
703 .trigger_instrument_id()
704 .unwrap_or(order.instrument_id());
705
706 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
707 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
708 log::error!("Cannot delete order: {e:?}");
709 }
710 }
711
712 self.cache
713 .borrow_mut()
714 .update_order_pending_cancel_local(&order);
715
716 let ts_now = self.clock.borrow().timestamp_ns();
718 let event = OrderCanceled::new(
719 order.trader_id(),
720 order.strategy_id(),
721 order.instrument_id(),
722 order.client_order_id(),
723 UUID4::new(),
724 ts_now,
725 ts_now,
726 false,
727 order.venue_order_id(),
728 order.account_id(),
729 );
730
731 self.manager.send_exec_event(OrderEventAny::Canceled(event));
732 }
733
734 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
735 if !self.subscribed_strategies.contains(&strategy_id) {
736 if let Some(handler) = &self.on_event_handler {
738 msgbus::subscribe(format!("events.order.{strategy_id}"), handler.clone(), None);
739 msgbus::subscribe(
740 format!("events.position.{strategy_id}"),
741 handler.clone(),
742 None,
743 );
744 self.subscribed_strategies.insert(strategy_id);
745 log::info!("Subscribed to strategy {strategy_id} order and position events");
746 }
747 }
748
749 if let Some(position_id) = position_id {
750 if !self.monitored_positions.contains(&position_id) {
751 self.monitored_positions.insert(position_id);
752 }
753 }
754 }
755
756 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
757 match order.order_type() {
758 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
759 self.fill_limit_order(order);
760 }
761 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
762 self.fill_market_order(order);
763 }
764 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
765 }
766 }
767
768 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
769 if matches!(order.order_type(), OrderType::Limit) {
770 self.fill_market_order(order);
771 return;
772 }
773
774 let mut command = match self
776 .manager
777 .pop_submit_order_command(order.client_order_id())
778 {
779 Some(command) => command,
780 None => return, };
782
783 let trigger_instrument_id = order
784 .trigger_instrument_id()
785 .unwrap_or(order.instrument_id());
786
787 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
788 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
789 log::error!("Error deleting order: {e:?}");
790 }
791
792 let emulation_trigger = TriggerType::NoTrigger;
793
794 let mut transformed = if let Ok(transformed) = LimitOrder::new(
796 order.trader_id(),
797 order.strategy_id(),
798 order.instrument_id(),
799 order.client_order_id(),
800 order.order_side(),
801 order.quantity(),
802 order.price().unwrap(),
803 order.time_in_force(),
804 order.expire_time(),
805 order.is_post_only(),
806 order.is_reduce_only(),
807 order.is_quote_quantity(),
808 order.display_qty(),
809 Some(emulation_trigger),
810 Some(trigger_instrument_id),
811 order.contingency_type(),
812 order.order_list_id(),
813 order.linked_order_ids().map(Vec::from),
814 order.parent_order_id(),
815 order.exec_algorithm_id(),
816 order.exec_algorithm_params().cloned(),
817 order.exec_spawn_id(),
818 order.tags().map(Vec::from),
819 UUID4::new(),
820 self.clock.borrow().timestamp_ns(),
821 ) {
822 transformed
823 } else {
824 log::error!("Cannot create limit order");
825 return;
826 };
827 transformed.liquidity_side = order.liquidity_side();
828
829 let original_events = order.events();
836
837 for event in original_events {
838 transformed.events.insert(0, event.clone());
839 }
840
841 if let Err(e) = self.cache.borrow_mut().add_order(
842 OrderAny::Limit(transformed.clone()),
843 command.position_id,
844 Some(command.client_id),
845 true,
846 ) {
847 log::error!("Failed to add order: {e}");
848 }
849
850 command.order = OrderAny::Limit(transformed.clone());
852
853 msgbus::publish(
854 &format!("events.order.{}", order.strategy_id()).into(),
855 transformed.last_event(),
856 );
857
858 let released_price = match order.order_side() {
860 OrderSide::Buy => matching_core.ask,
861 OrderSide::Sell => matching_core.bid,
862 _ => panic!("invalid `OrderSide`"),
863 };
864
865 let event = OrderReleased::new(
867 order.trader_id(),
868 order.strategy_id(),
869 order.instrument_id(),
870 order.client_order_id(),
871 released_price.unwrap(),
872 UUID4::new(),
873 self.clock.borrow().timestamp_ns(),
874 self.clock.borrow().timestamp_ns(),
875 );
876
877 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
878 log::error!("Failed to apply order event: {e}");
879 }
880
881 if let Err(e) = self
882 .cache
883 .borrow_mut()
884 .update_order(&OrderAny::Limit(transformed.clone()))
885 {
886 log::error!("Failed to update order: {e}");
887 }
888
889 self.manager.send_risk_event(OrderEventAny::Released(event));
890
891 log::info!("Releasing order {}", order.client_order_id());
892
893 msgbus::publish(
895 &format!("events.order.{}", transformed.strategy_id()).into(),
896 &OrderEventAny::Released(event),
897 );
898
899 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
900 self.manager.send_algo_command(command, exec_algorithm_id);
901 } else {
902 self.manager
903 .send_exec_command(TradingCommand::SubmitOrder(command));
904 }
905 } else {
906 log::error!(
907 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
908 );
909 }
910 }
911
912 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
913 let mut command = match self
915 .manager
916 .pop_submit_order_command(order.client_order_id())
917 {
918 Some(command) => command,
919 None => panic!("invalid operation `_fill_market_order` with no command"),
920 };
921
922 let trigger_instrument_id = order
923 .trigger_instrument_id()
924 .unwrap_or(order.instrument_id());
925
926 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
927 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
928 log::error!("Cannot delete order: {e:?}");
929 }
930
931 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
932
933 let mut transformed = MarketOrder::new(
935 order.trader_id(),
936 order.strategy_id(),
937 order.instrument_id(),
938 order.client_order_id(),
939 order.order_side(),
940 order.quantity(),
941 order.time_in_force(),
942 UUID4::new(),
943 self.clock.borrow().timestamp_ns(),
944 order.is_reduce_only(),
945 order.is_quote_quantity(),
946 order.contingency_type(),
947 order.order_list_id(),
948 order.linked_order_ids().map(Vec::from),
949 order.parent_order_id(),
950 order.exec_algorithm_id(),
951 order.exec_algorithm_params().cloned(),
952 order.exec_spawn_id(),
953 order.tags().map(Vec::from),
954 );
955
956 let original_events = order.events();
957
958 for event in original_events {
959 transformed.events.insert(0, event.clone());
960 }
961
962 if let Err(e) = self.cache.borrow_mut().add_order(
963 OrderAny::Market(transformed.clone()),
964 command.position_id,
965 Some(command.client_id),
966 true,
967 ) {
968 log::error!("Failed to add order: {e}");
969 }
970
971 command.order = OrderAny::Market(transformed.clone());
973
974 msgbus::publish(
975 &format!("events.order.{}", order.strategy_id()).into(),
976 transformed.last_event(),
977 );
978
979 let released_price = match order.order_side() {
982 OrderSide::Buy => matching_core.ask,
983 OrderSide::Sell => matching_core.bid,
984 _ => panic!("invalid `OrderSide`"),
985 };
986
987 let ts_now = self.clock.borrow().timestamp_ns();
989 let event = OrderReleased::new(
990 order.trader_id(),
991 order.strategy_id(),
992 order.instrument_id(),
993 order.client_order_id(),
994 released_price.unwrap(),
995 UUID4::new(),
996 ts_now,
997 ts_now,
998 );
999
1000 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1001 log::error!("Failed to apply order event: {e}");
1002 }
1003
1004 if let Err(e) = self
1005 .cache
1006 .borrow_mut()
1007 .update_order(&OrderAny::Market(transformed))
1008 {
1009 log::error!("Failed to update order: {e}");
1010 }
1011 self.manager.send_risk_event(OrderEventAny::Released(event));
1012
1013 log::info!("Releasing order {}", order.client_order_id());
1014
1015 msgbus::publish(
1017 &format!("events.order.{}", order.strategy_id()).into(),
1018 &OrderEventAny::Released(event),
1019 );
1020
1021 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1022 self.manager.send_algo_command(command, exec_algorithm_id);
1023 } else {
1024 self.manager
1025 .send_exec_command(TradingCommand::SubmitOrder(command));
1026 }
1027 } else {
1028 log::error!(
1029 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
1030 );
1031 }
1032 }
1033
1034 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1035 if let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) {
1036 let mut bid = None;
1037 let mut ask = None;
1038 let mut last = None;
1039
1040 if matching_core.is_bid_initialized {
1041 bid = matching_core.bid;
1042 }
1043 if matching_core.is_ask_initialized {
1044 ask = matching_core.ask;
1045 }
1046 if matching_core.is_last_initialized {
1047 last = matching_core.last;
1048 }
1049
1050 let quote_tick = self
1051 .cache
1052 .borrow()
1053 .quote(&matching_core.instrument_id)
1054 .copied();
1055 let trade_tick = self
1056 .cache
1057 .borrow()
1058 .trade(&matching_core.instrument_id)
1059 .copied();
1060
1061 if bid.is_none() && quote_tick.is_some() {
1062 bid = Some(quote_tick.unwrap().bid_price);
1063 }
1064 if ask.is_none() && quote_tick.is_some() {
1065 ask = Some(quote_tick.unwrap().ask_price);
1066 }
1067 if last.is_none() && trade_tick.is_some() {
1068 last = Some(trade_tick.unwrap().price);
1069 }
1070
1071 let (new_trigger_price, new_price) = if let Ok((new_trigger_price, new_price)) =
1072 trailing_stop_calculate(matching_core.price_increment, order, bid, ask, last)
1073 {
1074 (new_trigger_price, new_price)
1075 } else {
1076 log::warn!("Cannot calculate trailing stop order");
1077 return;
1078 };
1079
1080 let (new_trigger_price, new_price) = match (new_trigger_price, new_price) {
1081 (None, None) => return, _ => (new_trigger_price, new_price),
1083 };
1084
1085 let ts_now = self.clock.borrow().timestamp_ns();
1087 let event = OrderUpdated::new(
1088 order.trader_id(),
1089 order.strategy_id(),
1090 order.instrument_id(),
1091 order.client_order_id(),
1092 order.quantity(),
1093 UUID4::new(),
1094 ts_now,
1095 ts_now,
1096 false,
1097 order.venue_order_id(),
1098 order.account_id(),
1099 new_price,
1100 new_trigger_price,
1101 );
1102
1103 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
1104 log::error!("Failed to apply order event: {e}");
1105 }
1106 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1107 log::error!("Failed to update order: {e}");
1108 }
1109
1110 self.manager.send_risk_event(OrderEventAny::Updated(event));
1111 } else {
1112 log::error!(
1113 "Cannot update trailing stop order: no matching core for instrument {}",
1114 order.instrument_id()
1115 );
1116 }
1117 }
1118}