1use std::{
17 cell::RefCell,
18 collections::{HashMap, HashSet},
19 rc::Rc,
20};
21
22use anyhow::Result;
23use nautilus_common::{
24 cache::Cache,
25 clock::Clock,
26 logging::{CMD, EVT, RECV},
27 msgbus::{MessageBus, handler::ShareableMessageHandler},
28};
29use nautilus_core::uuid::UUID4;
30use nautilus_model::{
31 data::{OrderBookDeltas, QuoteTick, TradeTick},
32 enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
33 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
34 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
35 orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
36 types::{Price, Quantity},
37};
38
39use crate::{
40 matching_core::OrderMatchingCore,
41 messages::{
42 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
43 cancel::CancelOrderHandlerAny, modify::ModifyOrderHandlerAny,
44 submit::SubmitOrderHandlerAny,
45 },
46 order_manager::manager::OrderManager,
47 trailing::trailing_stop_calculate,
48};
49
50pub struct OrderEmulator {
51 clock: Rc<RefCell<dyn Clock>>,
52 cache: Rc<RefCell<Cache>>,
53 msgbus: Rc<RefCell<MessageBus>>,
54 manager: OrderManager,
55 matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
56 subscribed_quotes: HashSet<InstrumentId>,
57 subscribed_trades: HashSet<InstrumentId>,
58 subscribed_strategies: HashSet<StrategyId>,
59 monitored_positions: HashSet<PositionId>,
60 on_event_handler: Option<ShareableMessageHandler>,
61}
62
63impl OrderEmulator {
64 pub fn new(
65 clock: Rc<RefCell<dyn Clock>>,
66 cache: Rc<RefCell<Cache>>,
67 msgbus: Rc<RefCell<MessageBus>>,
68 ) -> Self {
69 let active_local = true;
73 let manager = OrderManager::new(
74 clock.clone(),
75 msgbus.clone(),
76 cache.clone(),
77 active_local,
78 None,
79 None,
80 None,
81 );
82
83 Self {
84 clock,
85 cache,
86 msgbus,
87 manager,
88 matching_cores: HashMap::new(),
89 subscribed_quotes: HashSet::new(),
90 subscribed_trades: HashSet::new(),
91 subscribed_strategies: HashSet::new(),
92 monitored_positions: HashSet::new(),
93 on_event_handler: None,
94 }
95 }
96
97 pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
98 self.on_event_handler = Some(handler);
99 }
100
101 pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
102 self.manager.set_submit_order_handler(handler);
103 }
104
105 pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
106 self.manager.set_cancel_order_handler(handler);
107 }
108
109 pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
110 self.manager.set_modify_order_handler(handler);
111 }
112
113 #[must_use]
114 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
115 let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
116 quotes.sort();
117 quotes
118 }
119
120 #[must_use]
121 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
122 let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
123 trades.sort();
124 trades
125 }
126
127 #[must_use]
128 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
129 self.manager.get_submit_order_commands()
130 }
131
132 #[must_use]
133 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
134 self.matching_cores.get(instrument_id).cloned()
135 }
136
137 pub fn on_start(&mut self) -> Result<()> {
138 let emulated_orders: Vec<OrderAny> = self
139 .cache
140 .borrow()
141 .orders_emulated(None, None, None, None)
142 .into_iter()
143 .cloned()
144 .collect();
145
146 if emulated_orders.is_empty() {
147 log::error!("No emulated orders to reactivate");
148 return Ok(());
149 }
150
151 for order in emulated_orders {
152 if !matches!(
153 order.status(),
154 OrderStatus::Initialized | OrderStatus::Emulated
155 ) {
156 continue; }
158
159 if let Some(parent_order_id) = &order.parent_order_id() {
160 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
161 order.clone()
162 } else {
163 log::error!("Cannot handle order: parent {parent_order_id} not found");
164 continue;
165 };
166
167 let is_position_closed = parent_order
168 .position_id()
169 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
170 if parent_order.is_closed() && is_position_closed {
171 self.manager.cancel_order(&order);
172 continue; }
174
175 if parent_order.contingency_type() == Some(ContingencyType::Oto)
176 && (parent_order.is_active_local()
177 || parent_order.filled_qty() == Quantity::zero(0))
178 {
179 continue; }
181 }
182
183 let position_id = self
184 .cache
185 .borrow()
186 .position_id(&order.client_order_id())
187 .copied();
188 let client_id = self
189 .cache
190 .borrow()
191 .client_id(&order.client_order_id())
192 .copied();
193
194 let command = SubmitOrder::new(
195 order.trader_id(),
196 client_id.unwrap(),
197 order.strategy_id(),
198 order.instrument_id(),
199 order.client_order_id(),
200 order.venue_order_id().unwrap(),
201 order.clone(),
202 order.exec_algorithm_id(),
203 position_id,
204 UUID4::new(),
205 self.clock.borrow().timestamp_ns(),
206 )?;
207
208 self.handle_submit_order(command);
209 }
210
211 Ok(())
212 }
213
214 pub fn on_event(&mut self, event: OrderEventAny) {
215 log::info!("{RECV}{EVT} {event}");
216
217 self.manager.handle_event(event.clone());
218
219 if let Some(order) = self.cache.borrow().order(&event.client_order_id()) {
220 if order.is_closed() {
221 if let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id()) {
222 if let Err(e) =
223 matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
224 {
225 log::error!("Error deleting order: {}", e);
226 }
227 }
228 }
229 }
230 }
232
233 pub const fn on_stop(&self) {}
234
235 pub fn on_reset(&mut self) {
236 self.manager.reset();
237 self.matching_cores.clear();
238 }
239
240 pub const fn on_dispose(&self) {}
241
242 pub fn execute(&mut self, command: TradingCommand) {
243 log::info!("{RECV}{CMD} {command}");
244
245 match command {
246 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
247 TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
248 TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
249 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
250 TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
251 _ => log::error!("Cannot handle command: unrecognized {:?}", command),
252 }
253 }
254
255 fn create_matching_core(
256 &mut self,
257 instrument_id: InstrumentId,
258 price_increment: Price,
259 ) -> OrderMatchingCore {
260 let matching_core =
261 OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
262 self.matching_cores
263 .insert(instrument_id, matching_core.clone());
264 log::info!("Creating matching core for {:?}", instrument_id);
265 matching_core
266 }
267
268 pub fn handle_submit_order(&mut self, command: SubmitOrder) {
269 let mut order = command.order.clone();
270 let emulation_trigger = order.emulation_trigger();
271
272 assert_ne!(
273 emulation_trigger,
274 Some(TriggerType::NoTrigger),
275 "command.order.emulation_trigger must not be TriggerType::NoTrigger"
276 );
277 assert!(
278 self.manager
279 .get_submit_order_commands()
280 .contains_key(&order.client_order_id()),
281 "command.order.client_order_id must be in submit_order_commands"
282 );
283
284 if !matches!(
285 emulation_trigger,
286 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
287 ) {
288 log::error!(
289 "Cannot emulate order: `TriggerType` {:?} not supported",
290 emulation_trigger
291 );
292 self.manager.cancel_order(&order);
293 return;
294 }
295
296 self.check_monitoring(command.strategy_id, command.position_id);
297
298 let trigger_instrument_id = order
300 .trigger_instrument_id()
301 .unwrap_or_else(|| order.instrument_id());
302
303 let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
304
305 let mut matching_core = if let Some(core) = matching_core {
306 core
307 } else {
308 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
310 let synthetic = self
311 .cache
312 .borrow()
313 .synthetic(&trigger_instrument_id)
314 .cloned();
315 if let Some(synthetic) = synthetic {
316 (synthetic.id, synthetic.price_increment)
317 } else {
318 log::error!(
319 "Cannot emulate order: no synthetic instrument {} for trigger",
320 trigger_instrument_id
321 );
322 self.manager.cancel_order(&order);
323 return;
324 }
325 } else {
326 let instrument = self
327 .cache
328 .borrow()
329 .instrument(&trigger_instrument_id)
330 .cloned();
331 if let Some(instrument) = instrument {
332 (instrument.id(), instrument.price_increment())
333 } else {
334 log::error!(
335 "Cannot emulate order: no instrument {} for trigger",
336 trigger_instrument_id
337 );
338 self.manager.cancel_order(&order);
339 return;
340 }
341 };
342
343 self.create_matching_core(instrument_id, price_increment)
344 };
345
346 if matches!(
348 order.order_type(),
349 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
350 ) {
351 self.update_trailing_stop_order(&mut order);
352 if order.trigger_price().is_none() {
353 log::error!(
354 "Cannot handle trailing stop order with no trigger_price and no market updates"
355 );
356
357 self.manager.cancel_order(&order);
358 return;
359 }
360 }
361
362 self.manager.cache_submit_order_command(command);
364
365 matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
367
368 match emulation_trigger.unwrap() {
370 TriggerType::Default | TriggerType::BidAsk => {
371 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
372 if !trigger_instrument_id.is_synthetic() {
373 }
376 self.subscribed_quotes.insert(trigger_instrument_id);
379 }
380 }
381 TriggerType::LastPrice => {
382 if !self.subscribed_trades.contains(&trigger_instrument_id) {
383 self.subscribed_trades.insert(trigger_instrument_id);
386 }
387 }
388 _ => {
389 log::error!("Invalid TriggerType: {:?}", emulation_trigger);
390 return;
391 }
392 }
393
394 if !self
396 .manager
397 .get_submit_order_commands()
398 .contains_key(&order.client_order_id())
399 {
400 return; }
402
403 if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
405 log::error!("Cannot add order: {:?}", e);
406 return;
407 }
408
409 if order.status() == OrderStatus::Initialized {
411 let event = OrderEmulated::new(
412 order.trader_id(),
413 order.strategy_id(),
414 order.instrument_id(),
415 order.client_order_id(),
416 UUID4::new(),
417 self.clock.borrow().timestamp_ns(),
418 self.clock.borrow().timestamp_ns(),
419 );
420
421 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
422 log::error!("Cannot apply order event: {:?}", e);
423 return;
424 }
425
426 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
427 log::error!("Cannot update order: {:?}", e);
428 return;
429 }
430
431 self.manager.send_risk_event(OrderEventAny::Emulated(event));
432
433 self.msgbus.borrow().publish(
434 &format!("events.order.{}", order.strategy_id()).into(),
435 &OrderEventAny::Emulated(event),
436 );
437 }
438
439 self.matching_cores
441 .insert(trigger_instrument_id, matching_core);
442
443 log::info!("Emulating {}", order);
444 }
445
446 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
447 self.check_monitoring(command.strategy_id, command.position_id);
448
449 for order in &command.order_list.orders {
450 if let Some(parent_order_id) = order.parent_order_id() {
451 let cache = self.cache.borrow();
452 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
453 parent_order
454 } else {
455 log::error!("Parent order for {} not found", order.client_order_id());
456 continue;
457 };
458
459 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
460 continue; }
462 }
463
464 if let Err(e) = self.manager.create_new_submit_order(
465 order,
466 command.position_id,
467 Some(command.client_id),
468 ) {
469 log::error!("Error creating new submit order: {}", e);
470 }
471 }
472 }
473
474 fn handle_modify_order(&mut self, command: ModifyOrder) {
475 if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
476 let price = match command.price {
477 Some(price) => Some(price),
478 None => order.price(),
479 };
480
481 let trigger_price = match command.trigger_price {
482 Some(trigger_price) => Some(trigger_price),
483 None => order.trigger_price(),
484 };
485
486 let ts_now = self.clock.borrow().timestamp_ns();
488 let event = OrderUpdated::new(
489 order.trader_id(),
490 order.strategy_id(),
491 order.instrument_id(),
492 order.client_order_id(),
493 command.quantity.unwrap_or(order.quantity()),
494 UUID4::new(),
495 ts_now,
496 ts_now,
497 false,
498 order.venue_order_id(),
499 order.account_id(),
500 price,
501 trigger_price,
502 );
503
504 self.manager.send_exec_event(OrderEventAny::Updated(event));
505
506 let trigger_instrument_id = order
507 .trigger_instrument_id()
508 .unwrap_or_else(|| order.instrument_id());
509
510 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
511 matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
512 } else {
513 log::error!(
514 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {}",
515 trigger_instrument_id
516 );
517 }
518 } else {
519 log::error!("Cannot modify order: {} not found", command.client_order_id);
520 }
521 }
522
523 pub fn handle_cancel_order(&mut self, command: CancelOrder) {
524 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
525 order.clone()
526 } else {
527 log::error!("Cannot cancel order: {} not found", command.client_order_id);
528 return;
529 };
530
531 let trigger_instrument_id = order
532 .trigger_instrument_id()
533 .unwrap_or_else(|| order.instrument_id());
534
535 let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
536 core
537 } else {
538 self.manager.cancel_order(&order);
539 return;
540 };
541
542 if !matching_core.order_exists(order.client_order_id())
543 && order.is_open()
544 && !order.is_pending_cancel()
545 {
546 self.manager
548 .send_exec_command(TradingCommand::CancelOrder(command));
549 } else {
550 self.manager.cancel_order(&order);
551 }
552 }
553
554 fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
555 let matching_core = match self.matching_cores.get(&command.instrument_id) {
556 Some(core) => core,
557 None => return, };
559
560 let orders_to_cancel = match command.order_side {
561 OrderSide::NoOrderSide => {
562 let mut all_orders = Vec::new();
564 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
565 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
566 all_orders
567 }
568 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
569 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
570 };
571
572 for order in orders_to_cancel {
574 self.manager.cancel_order(&OrderAny::from(order));
575 }
576 }
577
578 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
581 log::info!(
582 "Updating order {} quantity to {}",
583 order.client_order_id(),
584 new_quantity
585 );
586
587 let ts_now = self.clock.borrow().timestamp_ns();
589 let event = OrderUpdated::new(
590 order.trader_id(),
591 order.strategy_id(),
592 order.instrument_id(),
593 order.client_order_id(),
594 new_quantity,
595 UUID4::new(),
596 ts_now,
597 ts_now,
598 false,
599 None,
600 order.account_id(),
601 None,
602 None,
603 );
604
605 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
606 log::error!("Cannot apply order event: {:?}", e);
607 return;
608 }
609 if let Err(e) = self.cache.borrow_mut().update_order(order) {
610 log::error!("Cannot update order: {:?}", e);
611 return;
612 }
613
614 self.manager.send_risk_event(OrderEventAny::Updated(event));
615 }
616
617 pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
620 log::debug!("Processing OrderBookDeltas:{}", deltas);
621
622 let instrument_id = &deltas.instrument_id;
623 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
624 if let Some(book) = self.cache.borrow().order_book(instrument_id) {
625 let best_bid = book.best_bid_price();
626 let best_ask = book.best_ask_price();
627
628 if let Some(best_bid) = best_bid {
629 matching_core.set_bid_raw(best_bid);
630 }
631
632 if let Some(best_ask) = best_ask {
633 matching_core.set_ask_raw(best_ask);
634 }
635 } else {
636 log::error!(
637 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
638 deltas.instrument_id
639 );
640 }
641
642 self.iterate_orders(instrument_id);
643 } else {
644 log::error!(
645 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
646 deltas.instrument_id
647 );
648 }
649 }
650
651 pub fn on_quote_tick(&mut self, tick: QuoteTick) {
652 log::debug!("Processing QuoteTick:{}", tick);
653
654 let instrument_id = &tick.instrument_id;
655 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
656 matching_core.set_bid_raw(tick.bid_price);
657 matching_core.set_ask_raw(tick.ask_price);
658
659 self.iterate_orders(instrument_id);
660 } else {
661 log::error!(
662 "Cannot handle `QuoteTick`: no matching core for instrument {}",
663 tick.instrument_id
664 );
665 }
666 }
667
668 pub fn on_trade_tick(&mut self, tick: TradeTick) {
669 log::debug!("Processing TradeTick:{}", tick);
670
671 let instrument_id = &tick.instrument_id;
672 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
673 matching_core.set_last_raw(tick.price);
674 if !self.subscribed_quotes.contains(instrument_id) {
675 matching_core.set_bid_raw(tick.price);
676 matching_core.set_ask_raw(tick.price);
677 }
678
679 self.iterate_orders(instrument_id);
680 } else {
681 log::error!(
682 "Cannot handle `TradeTick`: no matching core for instrument {}",
683 tick.instrument_id
684 );
685 }
686 }
687
688 fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
689 let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
690 matching_core.iterate();
691
692 matching_core.get_orders()
693 } else {
694 log::error!(
695 "Cannot iterate orders: no matching core for instrument {}",
696 instrument_id
697 );
698 return;
699 };
700
701 for order in orders {
702 if order.is_closed() {
703 continue;
704 }
705
706 let mut order: OrderAny = order.clone().into();
707 if matches!(
708 order.order_type(),
709 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
710 ) {
711 self.update_trailing_stop_order(&mut order);
712 }
713 }
714 }
715
716 pub fn cancel_order(&mut self, order: &OrderAny) {
717 log::info!("Canceling order {}", order.client_order_id());
718
719 let mut order = order.clone();
720 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
721
722 let trigger_instrument_id = order
723 .trigger_instrument_id()
724 .unwrap_or(order.instrument_id());
725
726 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
727 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
728 log::error!("Cannot delete order: {:?}", e);
729 }
730 }
731
732 self.cache
733 .borrow_mut()
734 .update_order_pending_cancel_local(&order);
735
736 let ts_now = self.clock.borrow().timestamp_ns();
738 let event = OrderCanceled::new(
739 order.trader_id(),
740 order.strategy_id(),
741 order.instrument_id(),
742 order.client_order_id(),
743 UUID4::new(),
744 ts_now,
745 ts_now,
746 false,
747 order.venue_order_id(),
748 order.account_id(),
749 );
750
751 self.manager.send_exec_event(OrderEventAny::Canceled(event));
752 }
753
754 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
755 if !self.subscribed_strategies.contains(&strategy_id) {
756 if let Some(handler) = &self.on_event_handler {
758 self.msgbus.borrow_mut().subscribe(
759 format!("events.order.{strategy_id}"),
760 handler.clone(),
761 None,
762 );
763 self.msgbus.borrow_mut().subscribe(
764 format!("events.position.{strategy_id}"),
765 handler.clone(),
766 None,
767 );
768 self.subscribed_strategies.insert(strategy_id);
769 log::info!(
770 "Subscribed to strategy {} order and position events",
771 strategy_id
772 );
773 }
774 }
775
776 if let Some(position_id) = position_id {
777 if !self.monitored_positions.contains(&position_id) {
778 self.monitored_positions.insert(position_id);
779 }
780 }
781 }
782
783 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
784 match order.order_type() {
785 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
786 self.fill_limit_order(order);
787 }
788 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
789 self.fill_market_order(order);
790 }
791 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
792 }
793 }
794
795 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
796 if matches!(order.order_type(), OrderType::Limit) {
797 self.fill_market_order(order);
798 return;
799 }
800
801 let mut command = match self
803 .manager
804 .pop_submit_order_command(order.client_order_id())
805 {
806 Some(command) => command,
807 None => return, };
809
810 let trigger_instrument_id = order
811 .trigger_instrument_id()
812 .unwrap_or(order.instrument_id());
813
814 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
815 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
816 log::error!("Error deleting order: {:?}", e);
817 }
818
819 let emulation_trigger = TriggerType::NoTrigger;
820
821 let mut transformed = if let Ok(transformed) = LimitOrder::new(
823 order.trader_id(),
824 order.strategy_id(),
825 order.instrument_id(),
826 order.client_order_id(),
827 order.order_side(),
828 order.quantity(),
829 order.price().unwrap(),
830 order.time_in_force(),
831 order.expire_time(),
832 order.is_post_only(),
833 order.is_reduce_only(),
834 order.is_quote_quantity(),
835 order.display_qty(),
836 Some(emulation_trigger),
837 Some(trigger_instrument_id),
838 order.contingency_type(),
839 order.order_list_id(),
840 order.linked_order_ids(),
841 order.parent_order_id(),
842 order.exec_algorithm_id(),
843 order.exec_algorithm_params(),
844 order.exec_spawn_id(),
845 order.tags(),
846 UUID4::new(),
847 self.clock.borrow().timestamp_ns(),
848 ) {
849 transformed
850 } else {
851 log::error!("Cannot create limit order");
852 return;
853 };
854 transformed.liquidity_side = order.liquidity_side();
855
856 let original_events = order.events();
863
864 for event in original_events {
865 transformed.events.insert(0, event.clone());
866 }
867
868 if let Err(e) = self.cache.borrow_mut().add_order(
869 OrderAny::Limit(transformed.clone()),
870 command.position_id,
871 Some(command.client_id),
872 true,
873 ) {
874 log::error!("Failed to add order: {}", e);
875 }
876
877 command.order = OrderAny::Limit(transformed.clone());
879
880 self.msgbus.borrow().publish(
881 &format!("events.order.{}", order.strategy_id()).into(),
882 transformed.last_event(),
883 );
884
885 let released_price = match order.order_side() {
887 OrderSide::Buy => matching_core.ask,
888 OrderSide::Sell => matching_core.bid,
889 _ => panic!("invalid `OrderSide`"),
890 };
891
892 let event = OrderReleased::new(
894 order.trader_id(),
895 order.strategy_id(),
896 order.instrument_id(),
897 order.client_order_id(),
898 released_price.unwrap(),
899 UUID4::new(),
900 self.clock.borrow().timestamp_ns(),
901 self.clock.borrow().timestamp_ns(),
902 );
903
904 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
905 log::error!("Failed to apply order event: {}", e);
906 }
907
908 if let Err(e) = self
909 .cache
910 .borrow_mut()
911 .update_order(&OrderAny::Limit(transformed.clone()))
912 {
913 log::error!("Failed to update order: {}", e);
914 }
915
916 self.manager.send_risk_event(OrderEventAny::Released(event));
917
918 log::info!("Releasing order {}", order.client_order_id());
919
920 self.msgbus.borrow().publish(
922 &format!("events.order.{}", transformed.strategy_id()).into(),
923 &OrderEventAny::Released(event),
924 );
925
926 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
927 self.manager.send_algo_command(command, exec_algorithm_id);
928 } else {
929 self.manager
930 .send_exec_command(TradingCommand::SubmitOrder(command));
931 }
932 } else {
933 log::error!(
934 "Cannot fill limit order: no matching core for instrument {}",
935 trigger_instrument_id
936 );
937 }
938 }
939
940 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
941 let mut command = match self
943 .manager
944 .pop_submit_order_command(order.client_order_id())
945 {
946 Some(command) => command,
947 None => panic!("invalid operation `_fill_market_order` with no command"),
948 };
949
950 let trigger_instrument_id = order
951 .trigger_instrument_id()
952 .unwrap_or(order.instrument_id());
953
954 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
955 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
956 log::error!("Cannot delete order: {:?}", e);
957 }
958
959 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
960
961 let mut transformed = MarketOrder::new(
963 order.trader_id(),
964 order.strategy_id(),
965 order.instrument_id(),
966 order.client_order_id(),
967 order.order_side(),
968 order.quantity(),
969 order.time_in_force(),
970 UUID4::new(),
971 self.clock.borrow().timestamp_ns(),
972 order.is_reduce_only(),
973 order.is_quote_quantity(),
974 order.contingency_type(),
975 order.order_list_id(),
976 order.linked_order_ids(),
977 order.parent_order_id(),
978 order.exec_algorithm_id(),
979 order.exec_algorithm_params(),
980 order.exec_spawn_id(),
981 order.tags(),
982 );
983
984 let original_events = order.events();
985
986 for event in original_events {
987 transformed.events.insert(0, event.clone());
988 }
989
990 if let Err(e) = self.cache.borrow_mut().add_order(
991 OrderAny::Market(transformed.clone()),
992 command.position_id,
993 Some(command.client_id),
994 true,
995 ) {
996 log::error!("Failed to add order: {}", e);
997 }
998
999 command.order = OrderAny::Market(transformed.clone());
1001
1002 self.msgbus.borrow().publish(
1003 &format!("events.order.{}", order.strategy_id()).into(),
1004 transformed.last_event(),
1005 );
1006
1007 let released_price = match order.order_side() {
1010 OrderSide::Buy => matching_core.ask,
1011 OrderSide::Sell => matching_core.bid,
1012 _ => panic!("invalid `OrderSide`"),
1013 };
1014
1015 let ts_now = self.clock.borrow().timestamp_ns();
1017 let event = OrderReleased::new(
1018 order.trader_id(),
1019 order.strategy_id(),
1020 order.instrument_id(),
1021 order.client_order_id(),
1022 released_price.unwrap(),
1023 UUID4::new(),
1024 ts_now,
1025 ts_now,
1026 );
1027
1028 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1029 log::error!("Failed to apply order event: {}", e);
1030 }
1031
1032 if let Err(e) = self
1033 .cache
1034 .borrow_mut()
1035 .update_order(&OrderAny::Market(transformed))
1036 {
1037 log::error!("Failed to update order: {}", e);
1038 }
1039 self.manager.send_risk_event(OrderEventAny::Released(event));
1040
1041 log::info!("Releasing order {}", order.client_order_id());
1042
1043 self.msgbus.borrow().publish(
1045 &format!("events.order.{}", order.strategy_id()).into(),
1046 &OrderEventAny::Released(event),
1047 );
1048
1049 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1050 self.manager.send_algo_command(command, exec_algorithm_id);
1051 } else {
1052 self.manager
1053 .send_exec_command(TradingCommand::SubmitOrder(command));
1054 }
1055 } else {
1056 log::error!(
1057 "Cannot fill limit order: no matching core for instrument {}",
1058 trigger_instrument_id
1059 );
1060 }
1061 }
1062
1063 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1064 if let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) {
1065 let mut bid = None;
1066 let mut ask = None;
1067 let mut last = None;
1068
1069 if matching_core.is_bid_initialized {
1070 bid = matching_core.bid;
1071 }
1072 if matching_core.is_ask_initialized {
1073 ask = matching_core.ask;
1074 }
1075 if matching_core.is_last_initialized {
1076 last = matching_core.last;
1077 }
1078
1079 let quote_tick = self
1080 .cache
1081 .borrow()
1082 .quote(&matching_core.instrument_id)
1083 .copied();
1084 let trade_tick = self
1085 .cache
1086 .borrow()
1087 .trade(&matching_core.instrument_id)
1088 .copied();
1089
1090 if bid.is_none() && quote_tick.is_some() {
1091 bid = Some(quote_tick.unwrap().bid_price);
1092 }
1093 if ask.is_none() && quote_tick.is_some() {
1094 ask = Some(quote_tick.unwrap().ask_price);
1095 }
1096 if last.is_none() && trade_tick.is_some() {
1097 last = Some(trade_tick.unwrap().price);
1098 }
1099
1100 let (new_trigger_price, new_price) = if let Ok((new_trigger_price, new_price)) =
1101 trailing_stop_calculate(matching_core.price_increment, order, bid, ask, last)
1102 {
1103 (new_trigger_price, new_price)
1104 } else {
1105 log::warn!("Cannot calculate trailing stop order");
1106 return;
1107 };
1108
1109 let (new_trigger_price, new_price) = match (new_trigger_price, new_price) {
1110 (None, None) => return, _ => (new_trigger_price, new_price),
1112 };
1113
1114 let ts_now = self.clock.borrow().timestamp_ns();
1116 let event = OrderUpdated::new(
1117 order.trader_id(),
1118 order.strategy_id(),
1119 order.instrument_id(),
1120 order.client_order_id(),
1121 order.quantity(),
1122 UUID4::new(),
1123 ts_now,
1124 ts_now,
1125 false,
1126 order.venue_order_id(),
1127 order.account_id(),
1128 new_price,
1129 new_trigger_price,
1130 );
1131
1132 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
1133 log::error!("Failed to apply order event: {}", e);
1134 }
1135 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1136 log::error!("Failed to update order: {}", e);
1137 }
1138
1139 self.manager.send_risk_event(OrderEventAny::Updated(event));
1140 } else {
1141 log::error!(
1142 "Cannot update trailing stop order: no matching core for instrument {}",
1143 order.instrument_id()
1144 );
1145 }
1146 }
1147}