1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::RefCell,
23 cmp::min,
24 collections::HashMap,
25 fmt::Debug,
26 ops::{Add, Sub},
27 rc::Rc,
28};
29
30use chrono::TimeDelta;
31use nautilus_common::{
32 cache::Cache,
33 clock::Clock,
34 messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
35 msgbus,
36};
37use nautilus_core::{UUID4, UnixNanos};
38use nautilus_model::{
39 data::{Bar, BarType, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, order::BookOrder},
40 enums::{
41 AccountType, AggregationSource, AggressorSide, BarAggregation, BookType, ContingencyType,
42 LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide, OrderSideSpecified,
43 OrderStatus, OrderType, PriceType, TimeInForce,
44 },
45 events::{
46 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
47 OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
48 },
49 identifiers::{
50 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
51 VenueOrderId,
52 },
53 instruments::{EXPIRING_INSTRUMENT_TYPES, Instrument, InstrumentAny},
54 orderbook::OrderBook,
55 orders::{Order, OrderAny, PassiveOrderAny, StopOrderAny},
56 position::Position,
57 types::{Currency, Money, Price, Quantity, fixed::FIXED_PRECISION},
58};
59use ustr::Ustr;
60
61use crate::{
62 matching_core::OrderMatchingCore,
63 matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
64 models::{
65 fee::{FeeModel, FeeModelAny},
66 fill::FillModel,
67 },
68 trailing::trailing_stop_calculate,
69};
70
71pub struct OrderMatchingEngine {
73 pub venue: Venue,
75 pub instrument: InstrumentAny,
77 pub raw_id: u32,
79 pub book_type: BookType,
81 pub oms_type: OmsType,
83 pub account_type: AccountType,
85 pub market_status: MarketStatus,
87 pub config: OrderMatchingEngineConfig,
89 clock: Rc<RefCell<dyn Clock>>,
90 cache: Rc<RefCell<Cache>>,
91 book: OrderBook,
92 pub core: OrderMatchingCore,
93 fill_model: FillModel,
94 fee_model: FeeModelAny,
95 target_bid: Option<Price>,
96 target_ask: Option<Price>,
97 target_last: Option<Price>,
98 last_bar_bid: Option<Bar>,
99 last_bar_ask: Option<Bar>,
100 execution_bar_types: HashMap<InstrumentId, BarType>,
101 execution_bar_deltas: HashMap<BarType, TimeDelta>,
102 account_ids: HashMap<TraderId, AccountId>,
103 cached_filled_qty: HashMap<ClientOrderId, Quantity>,
104 ids_generator: IdsGenerator,
105}
106
107impl Debug for OrderMatchingEngine {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct(stringify!(OrderMatchingEngine))
110 .field("venue", &self.venue)
111 .field("instrument", &self.instrument.id())
112 .finish()
113 }
114}
115
116impl OrderMatchingEngine {
117 #[allow(clippy::too_many_arguments)]
119 pub fn new(
120 instrument: InstrumentAny,
121 raw_id: u32,
122 fill_model: FillModel,
123 fee_model: FeeModelAny,
124 book_type: BookType,
125 oms_type: OmsType,
126 account_type: AccountType,
127 clock: Rc<RefCell<dyn Clock>>,
128 cache: Rc<RefCell<Cache>>,
129 config: OrderMatchingEngineConfig,
130 ) -> Self {
131 let book = OrderBook::new(instrument.id(), book_type);
132 let core = OrderMatchingCore::new(
133 instrument.id(),
134 instrument.price_increment(),
135 None, None, None, );
139 let ids_generator = IdsGenerator::new(
140 instrument.id().venue,
141 oms_type,
142 raw_id,
143 config.use_random_ids,
144 config.use_position_ids,
145 cache.clone(),
146 );
147
148 Self {
149 venue: instrument.id().venue,
150 instrument,
151 raw_id,
152 fill_model,
153 fee_model,
154 book_type,
155 oms_type,
156 account_type,
157 clock,
158 cache,
159 book,
160 core,
161 market_status: MarketStatus::Open,
162 config,
163 target_bid: None,
164 target_ask: None,
165 target_last: None,
166 last_bar_bid: None,
167 last_bar_ask: None,
168 execution_bar_types: HashMap::new(),
169 execution_bar_deltas: HashMap::new(),
170 account_ids: HashMap::new(),
171 cached_filled_qty: HashMap::new(),
172 ids_generator,
173 }
174 }
175
176 pub fn reset(&mut self) {
182 self.book.clear(0, UnixNanos::default());
183 self.execution_bar_types.clear();
184 self.execution_bar_deltas.clear();
185 self.account_ids.clear();
186 self.cached_filled_qty.clear();
187 self.core.reset();
188 self.target_bid = None;
189 self.target_ask = None;
190 self.target_last = None;
191 self.ids_generator.reset();
192
193 log::info!("Reset {}", self.instrument.id());
194 }
195
196 pub const fn set_fill_model(&mut self, fill_model: FillModel) {
198 self.fill_model = fill_model;
199 }
200
201 #[must_use]
202 pub fn best_bid_price(&self) -> Option<Price> {
204 self.book.best_bid_price()
205 }
206
207 #[must_use]
208 pub fn best_ask_price(&self) -> Option<Price> {
210 self.book.best_ask_price()
211 }
212
213 #[must_use]
214 pub const fn get_book(&self) -> &OrderBook {
216 &self.book
217 }
218
219 #[must_use]
220 pub const fn get_open_bid_orders(&self) -> &[PassiveOrderAny] {
222 self.core.get_orders_bid()
223 }
224
225 #[must_use]
226 pub const fn get_open_ask_orders(&self) -> &[PassiveOrderAny] {
228 self.core.get_orders_ask()
229 }
230
231 #[must_use]
232 pub fn get_open_orders(&self) -> Vec<PassiveOrderAny> {
234 let mut orders = Vec::new();
236 orders.extend_from_slice(self.core.get_orders_bid());
237 orders.extend_from_slice(self.core.get_orders_ask());
238 orders
239 }
240
241 #[must_use]
242 pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
244 self.core.order_exists(client_order_id)
245 }
246
247 pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) {
251 log::debug!("Processing {delta}");
252
253 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
254 self.book.apply_delta(delta);
255 }
256
257 self.iterate(delta.ts_event);
258 }
259
260 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) {
261 log::debug!("Processing {deltas}");
262
263 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
264 self.book.apply_deltas(deltas);
265 }
266
267 self.iterate(deltas.ts_event);
268 }
269
270 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
274 log::debug!("Processing {quote}");
275
276 if self.book_type == BookType::L1_MBP {
277 self.book.update_quote_tick(quote).unwrap();
278 }
279
280 self.iterate(quote.ts_event);
281 }
282
283 pub fn process_bar(&mut self, bar: &Bar) {
287 log::debug!("Processing {bar}");
288
289 if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
291 return;
292 }
293
294 let bar_type = bar.bar_type;
295 if bar_type.aggregation_source() == AggregationSource::Internal {
297 return;
298 }
299
300 if bar_type.spec().aggregation == BarAggregation::Month {
302 return;
303 }
304
305 let execution_bar_type =
306 if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
307 execution_bar_type.to_owned()
308 } else {
309 self.execution_bar_types
310 .insert(bar.instrument_id(), bar_type);
311 self.execution_bar_deltas
312 .insert(bar_type, bar_type.spec().timedelta());
313 bar_type
314 };
315
316 if execution_bar_type != bar_type {
317 let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
318 if bar_type_timedelta.is_none() {
319 bar_type_timedelta = Some(bar_type.spec().timedelta());
320 self.execution_bar_deltas
321 .insert(bar_type, bar_type_timedelta.unwrap());
322 }
323 if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
324 >= &bar_type_timedelta.unwrap()
325 {
326 self.execution_bar_types
327 .insert(bar_type.instrument_id(), bar_type);
328 } else {
329 return;
330 }
331 }
332
333 match bar_type.spec().price_type {
334 PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
335 PriceType::Bid => {
336 self.last_bar_bid = Some(bar.to_owned());
337 self.process_quote_ticks_from_bar(bar);
338 }
339 PriceType::Ask => {
340 self.last_bar_ask = Some(bar.to_owned());
341 self.process_quote_ticks_from_bar(bar);
342 }
343 PriceType::Mark => panic!("Not implemented"),
344 }
345 }
346
347 fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
348 let size = Quantity::new(bar.volume.as_f64() / 4.0, bar.volume.precision);
350 let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
351 {
352 AggressorSide::Buyer
353 } else {
354 AggressorSide::Seller
355 };
356
357 let mut trade_tick = TradeTick::new(
359 bar.instrument_id(),
360 bar.open,
361 size,
362 aggressor_side,
363 self.ids_generator.generate_trade_id(),
364 bar.ts_event,
365 bar.ts_event,
366 );
367
368 if !self.core.is_last_initialized {
371 self.book.update_trade_tick(&trade_tick).unwrap();
372 self.iterate(trade_tick.ts_init);
373 self.core.set_last_raw(trade_tick.price);
374 }
375
376 if self.core.last.is_some_and(|last| bar.high > last) {
379 trade_tick.price = bar.high;
380 trade_tick.aggressor_side = AggressorSide::Buyer;
381 trade_tick.trade_id = self.ids_generator.generate_trade_id();
382
383 self.book.update_trade_tick(&trade_tick).unwrap();
384 self.iterate(trade_tick.ts_init);
385
386 self.core.set_last_raw(trade_tick.price);
387 }
388
389 if self.core.last.is_some_and(|last| bar.low < last) {
393 trade_tick.price = bar.low;
394 trade_tick.aggressor_side = AggressorSide::Seller;
395 trade_tick.trade_id = self.ids_generator.generate_trade_id();
396
397 self.book.update_trade_tick(&trade_tick).unwrap();
398 self.iterate(trade_tick.ts_init);
399
400 self.core.set_last_raw(trade_tick.price);
401 }
402
403 if self.core.last.is_some_and(|last| bar.close != last) {
408 trade_tick.price = bar.close;
409 trade_tick.aggressor_side = if bar.close > self.core.last.unwrap() {
410 AggressorSide::Buyer
411 } else {
412 AggressorSide::Seller
413 };
414 trade_tick.trade_id = self.ids_generator.generate_trade_id();
415
416 self.book.update_trade_tick(&trade_tick).unwrap();
417 self.iterate(trade_tick.ts_init);
418
419 self.core.set_last_raw(trade_tick.price);
420 }
421 }
422
423 fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
424 if self.last_bar_bid.is_none()
426 || self.last_bar_ask.is_none()
427 || self.last_bar_bid.unwrap().ts_event != self.last_bar_ask.unwrap().ts_event
428 {
429 return;
430 }
431 let bid_bar = self.last_bar_bid.unwrap();
432 let ask_bar = self.last_bar_ask.unwrap();
433 let bid_size = Quantity::new(bid_bar.volume.as_f64() / 4.0, bar.volume.precision);
434 let ask_size = Quantity::new(ask_bar.volume.as_f64() / 4.0, bar.volume.precision);
435
436 let mut quote_tick = QuoteTick::new(
438 self.book.instrument_id,
439 bid_bar.open,
440 ask_bar.open,
441 bid_size,
442 ask_size,
443 bid_bar.ts_init,
444 bid_bar.ts_init,
445 );
446
447 self.book.update_quote_tick("e_tick).unwrap();
449 self.iterate(quote_tick.ts_init);
450
451 quote_tick.bid_price = bid_bar.high;
453 quote_tick.ask_price = ask_bar.high;
454 self.book.update_quote_tick("e_tick).unwrap();
455 self.iterate(quote_tick.ts_init);
456
457 quote_tick.bid_price = bid_bar.low;
459 quote_tick.ask_price = ask_bar.low;
460 self.book.update_quote_tick("e_tick).unwrap();
461 self.iterate(quote_tick.ts_init);
462
463 quote_tick.bid_price = bid_bar.close;
465 quote_tick.ask_price = ask_bar.close;
466 self.book.update_quote_tick("e_tick).unwrap();
467 self.iterate(quote_tick.ts_init);
468
469 self.last_bar_bid = None;
471 self.last_bar_ask = None;
472 }
473
474 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
478 log::debug!("Processing {trade}");
479
480 if self.book_type == BookType::L1_MBP {
481 self.book.update_trade_tick(trade).unwrap();
482 }
483 self.core.set_last_raw(trade.price);
484
485 self.iterate(trade.ts_event);
486 }
487
488 pub fn process_status(&mut self, action: MarketStatusAction) {
489 log::debug!("Processing {action}");
490
491 if self.market_status == MarketStatus::Closed
493 && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
494 {
495 self.market_status = MarketStatus::Open;
496 }
497 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
499 self.market_status = MarketStatus::Paused;
500 }
501 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
503 self.market_status = MarketStatus::Suspended;
504 }
505 if self.market_status == MarketStatus::Open
507 && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
508 {
509 self.market_status = MarketStatus::Closed;
510 }
511 }
512
513 #[allow(clippy::needless_return)]
519 pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
520 {
522 let cache_borrow = self.cache.as_ref().borrow();
523
524 if self.core.order_exists(order.client_order_id()) {
525 self.generate_order_rejected(order, "Order already exists".into());
526 return;
527 }
528
529 self.account_ids.insert(order.trader_id(), account_id);
531
532 if EXPIRING_INSTRUMENT_TYPES.contains(&self.instrument.instrument_class()) {
534 if let Some(activation_ns) = self.instrument.activation_ns()
535 && self.clock.borrow().timestamp_ns() < activation_ns
536 {
537 self.generate_order_rejected(
538 order,
539 format!(
540 "Contract {} is not yet active, activation {}",
541 self.instrument.id(),
542 self.instrument.activation_ns().unwrap()
543 )
544 .into(),
545 );
546 return;
547 }
548 if let Some(expiration_ns) = self.instrument.expiration_ns()
549 && self.clock.borrow().timestamp_ns() >= expiration_ns
550 {
551 self.generate_order_rejected(
552 order,
553 format!(
554 "Contract {} has expired, expiration {}",
555 self.instrument.id(),
556 self.instrument.expiration_ns().unwrap()
557 )
558 .into(),
559 );
560 return;
561 }
562 }
563
564 if self.config.support_contingent_orders {
566 if let Some(parent_order_id) = order.parent_order_id() {
567 let parent_order = cache_borrow.order(&parent_order_id);
568 if parent_order.is_none()
569 || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
570 {
571 panic!("OTO parent not found");
572 }
573 if let Some(parent_order) = parent_order {
574 let parent_order_status = parent_order.status();
575 let order_is_open = order.is_open();
576 if parent_order.status() == OrderStatus::Rejected && order.is_open() {
577 self.generate_order_rejected(
578 order,
579 format!("Rejected OTO order from {parent_order_id}").into(),
580 );
581 return;
582 } else if parent_order.status() == OrderStatus::Accepted
583 && parent_order.status() == OrderStatus::Triggered
584 {
585 log::info!(
586 "Pending OTO order {} triggers from {parent_order_id}",
587 order.client_order_id(),
588 );
589 return;
590 }
591 }
592 }
593
594 if let Some(linked_order_ids) = order.linked_order_ids() {
595 for client_order_id in linked_order_ids {
596 match cache_borrow.order(client_order_id) {
597 Some(contingent_order)
598 if (order.contingency_type().unwrap() == ContingencyType::Oco
599 || order.contingency_type().unwrap()
600 == ContingencyType::Ouo)
601 && !order.is_closed()
602 && contingent_order.is_closed() =>
603 {
604 self.generate_order_rejected(
605 order,
606 format!("Contingent order {client_order_id} already closed")
607 .into(),
608 );
609 return;
610 }
611 None => panic!("Cannot find contingent order for {client_order_id}"),
612 _ => {}
613 }
614 }
615 }
616 }
617
618 if order.quantity().precision != self.instrument.size_precision() {
620 self.generate_order_rejected(
621 order,
622 format!(
623 "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
624 order.client_order_id(),
625 order.quantity().precision,
626 self.instrument.id(),
627 self.instrument.size_precision()
628 )
629 .into(),
630 );
631 return;
632 }
633
634 if let Some(price) = order.price()
636 && price.precision != self.instrument.price_precision()
637 {
638 self.generate_order_rejected(
639 order,
640 format!(
641 "Invalid order price precision for order {}, was {} when {} price precision is {}",
642 order.client_order_id(),
643 price.precision,
644 self.instrument.id(),
645 self.instrument.price_precision()
646 )
647 .into(),
648 );
649 return;
650 }
651
652 if let Some(trigger_price) = order.trigger_price()
654 && trigger_price.precision != self.instrument.price_precision()
655 {
656 self.generate_order_rejected(
657 order,
658 format!(
659 "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
660 order.client_order_id(),
661 trigger_price.precision,
662 self.instrument.id(),
663 self.instrument.price_precision()
664 )
665 .into(),
666 );
667 return;
668 }
669
670 let position: Option<&Position> = cache_borrow
672 .position_for_order(&order.client_order_id())
673 .or_else(|| {
674 if self.oms_type == OmsType::Netting {
675 let position_id = PositionId::new(
676 format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
677 );
678 cache_borrow.position(&position_id)
679 } else {
680 None
681 }
682 });
683
684 if order.order_side() == OrderSide::Sell
686 && self.account_type != AccountType::Margin
687 && matches!(self.instrument, InstrumentAny::Equity(_))
688 && (position.is_none()
689 || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
690 {
691 let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
692 self.generate_order_rejected(
693 order,
694 format!(
695 "Short selling not permitted on a CASH account with position {position_string} and order {order}",
696 )
697 .into(),
698 );
699 return;
700 }
701
702 if self.config.use_reduce_only
704 && order.is_reduce_only()
705 && !order.is_closed()
706 && position.is_none_or(|pos| {
707 pos.is_closed()
708 || (order.is_buy() && pos.is_long())
709 || (order.is_sell() && pos.is_short())
710 })
711 {
712 self.generate_order_rejected(
713 order,
714 format!(
715 "Reduce-only order {} ({}-{}) would have increased position",
716 order.client_order_id(),
717 order.order_type().to_string().to_uppercase(),
718 order.order_side().to_string().to_uppercase()
719 )
720 .into(),
721 );
722 return;
723 }
724 }
725
726 match order.order_type() {
727 OrderType::Market => self.process_market_order(order),
728 OrderType::Limit => self.process_limit_order(order),
729 OrderType::MarketToLimit => self.process_market_to_limit_order(order),
730 OrderType::StopMarket => self.process_stop_market_order(order),
731 OrderType::StopLimit => self.process_stop_limit_order(order),
732 OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
733 OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
734 OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
735 OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
736 }
737 }
738
739 pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
740 if let Some(order) = self.core.get_order(command.client_order_id) {
741 self.update_order(
742 &mut order.to_any(),
743 command.quantity,
744 command.price,
745 command.trigger_price,
746 None,
747 );
748 } else {
749 self.generate_order_modify_rejected(
750 command.trader_id,
751 command.strategy_id,
752 command.instrument_id,
753 command.client_order_id,
754 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
755 Some(command.venue_order_id),
756 Some(account_id),
757 );
758 }
759 }
760
761 pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
762 match self.core.get_order(command.client_order_id) {
763 Some(passive_order) => {
764 if passive_order.is_inflight() || passive_order.is_open() {
765 self.cancel_order(&OrderAny::from(passive_order.to_owned()), None);
766 }
767 }
768 None => self.generate_order_cancel_rejected(
769 command.trader_id,
770 command.strategy_id,
771 account_id,
772 command.instrument_id,
773 command.client_order_id,
774 command.venue_order_id,
775 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
776 ),
777 }
778 }
779
780 pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
781 let open_orders = self
782 .cache
783 .borrow()
784 .orders_open(None, Some(&command.instrument_id), None, None)
785 .into_iter()
786 .cloned()
787 .collect::<Vec<OrderAny>>();
788 for order in open_orders {
789 if command.order_side != OrderSide::NoOrderSide
790 && command.order_side != order.order_side()
791 {
792 continue;
793 }
794 if order.is_inflight() || order.is_open() {
795 self.cancel_order(&order, None);
796 }
797 }
798 }
799
800 pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
801 for order in &command.cancels {
802 self.process_cancel(order, account_id);
803 }
804 }
805
806 fn process_market_order(&mut self, order: &mut OrderAny) {
807 if order.time_in_force() == TimeInForce::AtTheOpen
808 || order.time_in_force() == TimeInForce::AtTheClose
809 {
810 log::error!(
811 "Market auction for the time in force {} is currently not supported",
812 order.time_in_force()
813 );
814 return;
815 }
816
817 let order_side = order.order_side();
819 let is_ask_initialized = self.core.is_ask_initialized;
820 let is_bid_initialized = self.core.is_bid_initialized;
821 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
822 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
823 {
824 self.generate_order_rejected(
825 order,
826 format!("No market for {}", order.instrument_id()).into(),
827 );
828 return;
829 }
830
831 self.fill_market_order(order);
832 }
833
834 fn process_limit_order(&mut self, order: &mut OrderAny) {
835 let limit_px = order.price().expect("Limit order must have a price");
836 if order.is_post_only()
837 && self
838 .core
839 .is_limit_matched(order.order_side_specified(), limit_px)
840 {
841 self.generate_order_rejected(
842 order,
843 format!(
844 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
845 order.order_type(),
846 order.order_side(),
847 order.price().unwrap(),
848 self.core
849 .bid
850 .map_or_else(|| "None".to_string(), |p| p.to_string()),
851 self.core
852 .ask
853 .map_or_else(|| "None".to_string(), |p| p.to_string())
854 )
855 .into(),
856 );
857 return;
858 }
859
860 self.accept_order(order);
862
863 if self
865 .core
866 .is_limit_matched(order.order_side_specified(), limit_px)
867 {
868 if order.liquidity_side().is_some()
870 && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
871 {
872 order.set_liquidity_side(LiquiditySide::Taker);
873 }
874 self.fill_limit_order(order);
875 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
876 self.cancel_order(order, None);
877 }
878 }
879
880 fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
881 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
883 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
884 {
885 self.generate_order_rejected(
886 order,
887 format!("No market for {}", order.instrument_id()).into(),
888 );
889 return;
890 }
891
892 self.fill_market_order(order);
894
895 if order.is_open() {
896 self.accept_order(order);
897 }
898 }
899
900 fn process_stop_market_order(&mut self, order: &mut OrderAny) {
901 let stop_px = order
902 .trigger_price()
903 .expect("Stop order must have a trigger price");
904 if self
905 .core
906 .is_stop_matched(order.order_side_specified(), stop_px)
907 {
908 if self.config.reject_stop_orders {
909 self.generate_order_rejected(
910 order,
911 format!(
912 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
913 order.order_type(),
914 order.order_side(),
915 order.trigger_price().unwrap(),
916 self.core
917 .bid
918 .map_or_else(|| "None".to_string(), |p| p.to_string()),
919 self.core
920 .ask
921 .map_or_else(|| "None".to_string(), |p| p.to_string())
922 ).into(),
923 );
924 return;
925 }
926 self.fill_market_order(order);
927 return;
928 }
929
930 self.accept_order(order);
932 }
933
934 fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
935 let stop_px = order
936 .trigger_price()
937 .expect("Stop order must have a trigger price");
938 if self
939 .core
940 .is_stop_matched(order.order_side_specified(), stop_px)
941 {
942 if self.config.reject_stop_orders {
943 self.generate_order_rejected(
944 order,
945 format!(
946 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
947 order.order_type(),
948 order.order_side(),
949 order.trigger_price().unwrap(),
950 self.core
951 .bid
952 .map_or_else(|| "None".to_string(), |p| p.to_string()),
953 self.core
954 .ask
955 .map_or_else(|| "None".to_string(), |p| p.to_string())
956 ).into(),
957 );
958 return;
959 }
960
961 self.accept_order(order);
962 self.generate_order_triggered(order);
963
964 let limit_px = order.price().expect("Stop limit order must have a price");
966 if self
967 .core
968 .is_limit_matched(order.order_side_specified(), limit_px)
969 {
970 order.set_liquidity_side(LiquiditySide::Taker);
971 self.fill_limit_order(order);
972 }
973 }
974
975 self.accept_order(order);
977 }
978
979 fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
980 if self
981 .core
982 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
983 {
984 if self.config.reject_stop_orders {
985 self.generate_order_rejected(
986 order,
987 format!(
988 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
989 order.order_type(),
990 order.order_side(),
991 order.trigger_price().unwrap(),
992 self.core
993 .bid
994 .map_or_else(|| "None".to_string(), |p| p.to_string()),
995 self.core
996 .ask
997 .map_or_else(|| "None".to_string(), |p| p.to_string())
998 ).into(),
999 );
1000 return;
1001 }
1002 self.fill_market_order(order);
1003 return;
1004 }
1005
1006 self.accept_order(order);
1008 }
1009
1010 fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
1011 if self
1012 .core
1013 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1014 {
1015 if self.config.reject_stop_orders {
1016 self.generate_order_rejected(
1017 order,
1018 format!(
1019 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1020 order.order_type(),
1021 order.order_side(),
1022 order.trigger_price().unwrap(),
1023 self.core
1024 .bid
1025 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1026 self.core
1027 .ask
1028 .map_or_else(|| "None".to_string(), |p| p.to_string())
1029 ).into(),
1030 );
1031 return;
1032 }
1033 self.accept_order(order);
1034 self.generate_order_triggered(order);
1035
1036 if self
1038 .core
1039 .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1040 {
1041 order.set_liquidity_side(LiquiditySide::Taker);
1042 self.fill_limit_order(order);
1043 }
1044 return;
1045 }
1046
1047 self.accept_order(order);
1049 }
1050
1051 fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1052 if let Some(trigger_price) = order.trigger_price()
1053 && self
1054 .core
1055 .is_stop_matched(order.order_side_specified(), trigger_price)
1056 {
1057 self.generate_order_rejected(
1058 order,
1059 format!(
1060 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1061 order.order_type(),
1062 order.order_side(),
1063 trigger_price,
1064 self.core
1065 .bid
1066 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1067 self.core
1068 .ask
1069 .map_or_else(|| "None".to_string(), |p| p.to_string())
1070 ).into(),
1071 );
1072 return;
1073 }
1074
1075 self.accept_order(order);
1077 }
1078
1079 pub fn iterate(&mut self, timestamp_ns: UnixNanos) {
1088 if self.book.has_bid() {
1092 self.core.set_bid_raw(self.book.best_bid_price().unwrap());
1093 }
1094 if self.book.has_ask() {
1095 self.core.set_ask_raw(self.book.best_ask_price().unwrap());
1096 }
1097 self.core.iterate();
1098
1099 self.core.bid = self.book.best_bid_price();
1100 self.core.ask = self.book.best_ask_price();
1101
1102 let orders_bid = self.core.get_orders_bid().to_vec();
1103 let orders_ask = self.core.get_orders_ask().to_vec();
1104
1105 self.iterate_orders(timestamp_ns, &orders_bid);
1106 self.iterate_orders(timestamp_ns, &orders_ask);
1107 }
1108
1109 fn maybe_activate_trailing_stop(
1110 &mut self,
1111 order: &mut OrderAny,
1112 bid: Option<Price>,
1113 ask: Option<Price>,
1114 ) -> bool {
1115 match order {
1116 OrderAny::TrailingStopMarket(inner) => {
1117 if inner.is_activated {
1118 return true;
1119 }
1120
1121 if inner.activation_price.is_none() {
1122 let px = match inner.order_side() {
1123 OrderSide::Buy => ask,
1124 OrderSide::Sell => bid,
1125 _ => None,
1126 };
1127 if let Some(p) = px {
1128 inner.activation_price = Some(p);
1129 inner.set_activated();
1130 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1131 log::error!("Failed to update order: {e}");
1132 }
1133 return true;
1134 }
1135 return false;
1136 }
1137
1138 let activation_price = inner.activation_price.unwrap();
1139 let hit = match inner.order_side() {
1140 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
1141 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
1142 _ => false,
1143 };
1144 if hit {
1145 inner.set_activated();
1146 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1147 log::error!("Failed to update order: {e}");
1148 }
1149 }
1150 hit
1151 }
1152 OrderAny::TrailingStopLimit(inner) => {
1153 if inner.is_activated {
1154 return true;
1155 }
1156
1157 if inner.activation_price.is_none() {
1158 let px = match inner.order_side() {
1159 OrderSide::Buy => ask,
1160 OrderSide::Sell => bid,
1161 _ => None,
1162 };
1163 if let Some(p) = px {
1164 inner.activation_price = Some(p);
1165 inner.set_activated();
1166 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1167 log::error!("Failed to update order: {e}");
1168 }
1169 return true;
1170 }
1171 return false;
1172 }
1173
1174 let activation_price = inner.activation_price.unwrap();
1175 let hit = match inner.order_side() {
1176 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
1177 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
1178 _ => false,
1179 };
1180 if hit {
1181 inner.set_activated();
1182 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1183 log::error!("Failed to update order: {e}");
1184 }
1185 }
1186 hit
1187 }
1188 _ => true,
1189 }
1190 }
1191
1192 fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[PassiveOrderAny]) {
1193 for order in orders {
1194 if order.is_closed() {
1195 continue;
1196 }
1197
1198 if self.config.support_gtd_orders
1199 && order
1200 .expire_time()
1201 .is_some_and(|expire_timestamp_ns| timestamp_ns >= expire_timestamp_ns)
1202 {
1203 let _ = self.core.delete_order(order);
1204 self.cached_filled_qty.remove(&order.client_order_id());
1205 self.expire_order(order);
1206 continue;
1207 }
1208
1209 if matches!(
1210 order,
1211 PassiveOrderAny::Stop(
1212 StopOrderAny::TrailingStopMarket(_) | StopOrderAny::TrailingStopLimit(_)
1213 )
1214 ) {
1215 let mut any = OrderAny::from(order.clone());
1216
1217 if !self.maybe_activate_trailing_stop(&mut any, self.core.bid, self.core.ask) {
1218 continue;
1219 }
1220
1221 self.update_trailing_stop_order(&mut any);
1222 }
1223
1224 if let Some(target_bid) = self.target_bid {
1226 self.core.bid = Some(target_bid);
1227 self.target_bid = None;
1228 }
1229 if let Some(target_bid) = self.target_bid.take() {
1230 self.core.bid = Some(target_bid);
1231 self.target_bid = None;
1232 }
1233 if let Some(target_ask) = self.target_ask.take() {
1234 self.core.ask = Some(target_ask);
1235 self.target_ask = None;
1236 }
1237 if let Some(target_last) = self.target_last.take() {
1238 self.core.last = Some(target_last);
1239 self.target_last = None;
1240 }
1241 }
1242
1243 self.target_bid = None;
1245 self.target_ask = None;
1246 self.target_last = None;
1247 }
1248
1249 fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1250 match order.price() {
1251 Some(order_price) => {
1252 let book_order =
1254 BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
1255
1256 let mut fills = self.book.simulate_fills(&book_order);
1257
1258 if fills.is_empty() {
1260 return fills;
1261 }
1262
1263 if let Some(triggered_price) = order.trigger_price() {
1265 if order
1267 .liquidity_side()
1268 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
1269 {
1270 if order.order_side() == OrderSide::Sell && order_price > triggered_price {
1271 let first_fill = fills.first().unwrap();
1273 let triggered_qty = first_fill.1;
1274 fills[0] = (triggered_price, triggered_qty);
1275 self.target_bid = self.core.bid;
1276 self.target_ask = self.core.ask;
1277 self.target_last = self.core.last;
1278 self.core.set_ask_raw(order_price);
1279 self.core.set_last_raw(order_price);
1280 } else if order.order_side() == OrderSide::Buy
1281 && order_price < triggered_price
1282 {
1283 let first_fill = fills.first().unwrap();
1285 let triggered_qty = first_fill.1;
1286 fills[0] = (triggered_price, triggered_qty);
1287 self.target_bid = self.core.bid;
1288 self.target_ask = self.core.ask;
1289 self.target_last = self.core.last;
1290 self.core.set_bid_raw(order_price);
1291 self.core.set_last_raw(order_price);
1292 }
1293 }
1294 }
1295
1296 if order
1298 .liquidity_side()
1299 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1300 {
1301 match order.order_side().as_specified() {
1302 OrderSideSpecified::Buy => {
1303 let target_price = if order
1304 .trigger_price()
1305 .is_some_and(|trigger_price| order_price > trigger_price)
1306 {
1307 order.trigger_price().unwrap()
1308 } else {
1309 order_price
1310 };
1311 for fill in &fills {
1312 let last_px = fill.0;
1313 if last_px < order_price {
1314 self.target_bid = self.core.bid;
1316 self.target_ask = self.core.ask;
1317 self.target_last = self.core.last;
1318 self.core.set_ask_raw(target_price);
1319 self.core.set_last_raw(target_price);
1320 }
1321 }
1322 }
1323 OrderSideSpecified::Sell => {
1324 let target_price = if order
1325 .trigger_price()
1326 .is_some_and(|trigger_price| order_price < trigger_price)
1327 {
1328 order.trigger_price().unwrap()
1329 } else {
1330 order_price
1331 };
1332 for fill in &fills {
1333 let last_px = fill.0;
1334 if last_px > order_price {
1335 self.target_bid = self.core.bid;
1337 self.target_ask = self.core.ask;
1338 self.target_last = self.core.last;
1339 self.core.set_bid_raw(target_price);
1340 self.core.set_last_raw(target_price);
1341 }
1342 }
1343 }
1344 }
1345 }
1346
1347 fills
1348 }
1349 None => panic!("Limit order must have a price"),
1350 }
1351 }
1352
1353 fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1354 let price = match order.order_side().as_specified() {
1356 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
1357 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
1358 };
1359
1360 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
1362 self.book.simulate_fills(&book_order)
1363 }
1364
1365 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
1366 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
1367 && filled_qty >= &order.quantity()
1368 {
1369 log::info!(
1370 "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
1371 filled_qty,
1372 order.quantity(),
1373 order.filled_qty(),
1374 order.quantity()
1375 );
1376 return;
1377 }
1378
1379 let venue_position_id = self.ids_generator.get_position_id(order, Some(true));
1380 let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
1381 let cache = self.cache.as_ref().borrow();
1382 cache.position(&venue_position_id).cloned()
1383 } else {
1384 None
1385 };
1386
1387 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1388 log::warn!(
1389 "Canceling REDUCE_ONLY {} as would increase position",
1390 order.order_type()
1391 );
1392 self.cancel_order(order, None);
1393 return;
1394 }
1395 order.set_liquidity_side(LiquiditySide::Taker);
1397 let fills = self.determine_market_price_and_volume(order);
1398 self.apply_fills(order, fills, LiquiditySide::Taker, None, position);
1399 }
1400
1401 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
1405 match order.price() {
1406 Some(order_price) => {
1407 let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
1408 if cached_filled_qty.is_some() && *cached_filled_qty.unwrap() >= order.quantity() {
1409 log::debug!(
1410 "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
1411 cached_filled_qty.unwrap(),
1412 order.quantity(),
1413 order.filled_qty(),
1414 order.leaves_qty(),
1415 );
1416 return;
1417 }
1418
1419 if order
1420 .liquidity_side()
1421 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1422 {
1423 if order.order_side() == OrderSide::Buy
1424 && self.core.bid.is_some_and(|bid| bid == order_price)
1425 && !self.fill_model.is_limit_filled()
1426 {
1427 return;
1429 }
1430 if order.order_side() == OrderSide::Sell
1431 && self.core.ask.is_some_and(|ask| ask == order_price)
1432 && !self.fill_model.is_limit_filled()
1433 {
1434 return;
1436 }
1437 }
1438
1439 let venue_position_id = self.ids_generator.get_position_id(order, None);
1440 let position = if let Some(venue_position_id) = venue_position_id {
1441 let cache = self.cache.as_ref().borrow();
1442 cache.position(&venue_position_id).cloned()
1443 } else {
1444 None
1445 };
1446
1447 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1448 log::warn!(
1449 "Canceling REDUCE_ONLY {} as would increase position",
1450 order.order_type()
1451 );
1452 self.cancel_order(order, None);
1453 return;
1454 }
1455
1456 let fills = self.determine_limit_price_and_volume(order);
1457
1458 self.apply_fills(
1459 order,
1460 fills,
1461 order.liquidity_side().unwrap(),
1462 venue_position_id,
1463 position,
1464 );
1465 }
1466 None => panic!("Limit order must have a price"),
1467 }
1468 }
1469
1470 fn apply_fills(
1471 &mut self,
1472 order: &mut OrderAny,
1473 fills: Vec<(Price, Quantity)>,
1474 liquidity_side: LiquiditySide,
1475 venue_position_id: Option<PositionId>,
1476 position: Option<Position>,
1477 ) {
1478 if order.time_in_force() == TimeInForce::Fok {
1479 let mut total_size = Quantity::zero(order.quantity().precision);
1480 for (fill_px, fill_qty) in &fills {
1481 total_size = total_size.add(*fill_qty);
1482 }
1483
1484 if order.leaves_qty() > total_size {
1485 self.cancel_order(order, None);
1486 return;
1487 }
1488 }
1489
1490 if fills.is_empty() {
1491 if order.status() == OrderStatus::Submitted {
1492 self.generate_order_rejected(
1493 order,
1494 format!("No market for {}", order.instrument_id()).into(),
1495 );
1496 } else {
1497 log::error!(
1498 "Cannot fill order: no fills from book when fills were expected (check size in data)"
1499 );
1500 return;
1501 }
1502 }
1503
1504 if self.oms_type == OmsType::Netting {
1505 let venue_position_id: Option<PositionId> = None;
1506 }
1507
1508 let mut initial_market_to_limit_fill = false;
1509 for &(mut fill_px, ref fill_qty) in &fills {
1510 assert!(
1512 (fill_px.precision == self.instrument.price_precision()),
1513 "Invalid price precision for fill price {} when instrument price precision is {}.\
1514 Check that the data price precision matches the {} instrument",
1515 fill_px.precision,
1516 self.instrument.price_precision(),
1517 self.instrument.id()
1518 );
1519
1520 assert!(
1522 (fill_qty.precision == self.instrument.size_precision()),
1523 "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
1524 Check that the data quantity precision matches the {} instrument",
1525 fill_qty.precision,
1526 self.instrument.size_precision(),
1527 self.instrument.id()
1528 );
1529
1530 if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
1531 && order.order_type() == OrderType::MarketToLimit
1532 {
1533 self.generate_order_updated(order, order.quantity(), Some(fill_px), None);
1534 initial_market_to_limit_fill = true;
1535 }
1536
1537 if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
1538 fill_px = match order.order_side().as_specified() {
1539 OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
1540 OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
1541 }
1542 }
1543
1544 if self.config.use_reduce_only
1546 && order.is_reduce_only()
1547 && let Some(position) = &position
1548 && *fill_qty > position.quantity
1549 {
1550 if position.quantity == Quantity::zero(position.quantity.precision) {
1551 return;
1553 }
1554
1555 let adjusted_fill_qty =
1557 Quantity::from_raw(position.quantity.raw, fill_qty.precision);
1558
1559 self.generate_order_updated(order, adjusted_fill_qty, None, None);
1560 }
1561
1562 if fill_qty.is_zero() {
1563 if fills.len() == 1 && order.status() == OrderStatus::Submitted {
1564 self.generate_order_rejected(
1565 order,
1566 format!("No market for {}", order.instrument_id()).into(),
1567 );
1568 }
1569 return;
1570 }
1571
1572 self.fill_order(
1573 order,
1574 fill_px,
1575 *fill_qty,
1576 liquidity_side,
1577 venue_position_id,
1578 position.clone(),
1579 );
1580
1581 if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
1582 return;
1584 }
1585 }
1586
1587 if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
1588 self.cancel_order(order, None);
1590 return;
1591 }
1592
1593 if order.is_open()
1594 && self.book_type == BookType::L1_MBP
1595 && matches!(
1596 order.order_type(),
1597 OrderType::Market
1598 | OrderType::MarketIfTouched
1599 | OrderType::StopMarket
1600 | OrderType::TrailingStopMarket
1601 )
1602 {
1603 todo!("Exhausted simulated book volume")
1607 }
1608 }
1609
1610 fn fill_order(
1611 &mut self,
1612 order: &mut OrderAny,
1613 last_px: Price,
1614 last_qty: Quantity,
1615 liquidity_side: LiquiditySide,
1616 venue_position_id: Option<PositionId>,
1617 position: Option<Position>,
1618 ) {
1619 match self.cached_filled_qty.get(&order.client_order_id()) {
1620 Some(filled_qty) => {
1621 let leaves_qty = order.quantity() - *filled_qty;
1622 let last_qty = min(last_qty, leaves_qty);
1623 let new_filled_qty = *filled_qty + last_qty;
1624 self.cached_filled_qty
1626 .insert(order.client_order_id(), new_filled_qty);
1627 }
1628 None => {
1629 self.cached_filled_qty
1630 .insert(order.client_order_id(), last_qty);
1631 }
1632 }
1633
1634 let commission = self
1636 .fee_model
1637 .get_commission(order, last_qty, last_px, &self.instrument)
1638 .unwrap();
1639
1640 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1641 self.generate_order_filled(
1642 order,
1643 venue_order_id,
1644 venue_position_id,
1645 last_qty,
1646 last_px,
1647 self.instrument.quote_currency(),
1648 commission,
1649 liquidity_side,
1650 );
1651
1652 if order.is_passive() && order.is_closed() {
1653 if self.core.order_exists(order.client_order_id()) {
1655 let _ = self
1656 .core
1657 .delete_order(&PassiveOrderAny::from(order.clone()));
1658 }
1659 self.cached_filled_qty.remove(&order.client_order_id());
1660 }
1661
1662 if !self.config.support_contingent_orders {
1663 return;
1664 }
1665
1666 if let Some(contingency_type) = order.contingency_type() {
1667 match contingency_type {
1668 ContingencyType::Oto => {
1669 if let Some(linked_orders_ids) = order.linked_order_ids() {
1670 for client_order_id in linked_orders_ids {
1671 let mut child_order = match self.cache.borrow().order(client_order_id) {
1672 Some(child_order) => child_order.clone(),
1673 None => panic!("Order {client_order_id} not found in cache"),
1674 };
1675
1676 if child_order.is_closed() || child_order.is_active_local() {
1677 continue;
1678 }
1679
1680 if let (None, Some(position_id)) =
1682 (child_order.position_id(), order.position_id())
1683 {
1684 self.cache
1685 .borrow_mut()
1686 .add_position_id(
1687 &position_id,
1688 &self.venue,
1689 client_order_id,
1690 &child_order.strategy_id(),
1691 )
1692 .unwrap();
1693 log::debug!(
1694 "Added position id {position_id} to cache for order {client_order_id}"
1695 );
1696 }
1697
1698 if (!child_order.is_open())
1699 || (matches!(child_order.status(), OrderStatus::PendingUpdate)
1700 && child_order
1701 .previous_status()
1702 .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
1703 {
1704 let account_id = order.account_id().unwrap_or_else(|| {
1705 *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
1706 panic!(
1707 "Account ID not found for trader {}",
1708 order.trader_id()
1709 )
1710 })
1711 });
1712 self.process_order(&mut child_order, account_id);
1713 }
1714 }
1715 } else {
1716 log::error!(
1717 "OTO order {} does not have linked orders",
1718 order.client_order_id()
1719 );
1720 }
1721 }
1722 ContingencyType::Oco => {
1723 if let Some(linked_orders_ids) = order.linked_order_ids() {
1724 for client_order_id in linked_orders_ids {
1725 let child_order = match self.cache.borrow().order(client_order_id) {
1726 Some(child_order) => child_order.clone(),
1727 None => panic!("Order {client_order_id} not found in cache"),
1728 };
1729
1730 if child_order.is_closed() || child_order.is_active_local() {
1731 continue;
1732 }
1733
1734 self.cancel_order(&child_order, None);
1735 }
1736 } else {
1737 log::error!(
1738 "OCO order {} does not have linked orders",
1739 order.client_order_id()
1740 );
1741 }
1742 }
1743 ContingencyType::Ouo => {
1744 if let Some(linked_orders_ids) = order.linked_order_ids() {
1745 for client_order_id in linked_orders_ids {
1746 let mut child_order = match self.cache.borrow().order(client_order_id) {
1747 Some(child_order) => child_order.clone(),
1748 None => panic!("Order {client_order_id} not found in cache"),
1749 };
1750
1751 if child_order.is_active_local() {
1752 continue;
1753 }
1754
1755 if order.is_closed() && child_order.is_open() {
1756 self.cancel_order(&child_order, None);
1757 } else if !order.leaves_qty().is_zero()
1758 && order.leaves_qty() != child_order.leaves_qty()
1759 {
1760 let price = child_order.price();
1761 let trigger_price = child_order.trigger_price();
1762 self.update_order(
1763 &mut child_order,
1764 Some(order.leaves_qty()),
1765 price,
1766 trigger_price,
1767 Some(false),
1768 );
1769 }
1770 }
1771 } else {
1772 log::error!(
1773 "OUO order {} does not have linked orders",
1774 order.client_order_id()
1775 );
1776 }
1777 }
1778 _ => {}
1779 }
1780 }
1781 }
1782
1783 fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
1784 if self
1785 .core
1786 .is_limit_matched(order.order_side_specified(), price)
1787 {
1788 if order.is_post_only() {
1789 self.generate_order_modify_rejected(
1790 order.trader_id(),
1791 order.strategy_id(),
1792 order.instrument_id(),
1793 order.client_order_id(),
1794 Ustr::from(format!(
1795 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1796 order.order_type(),
1797 order.order_side(),
1798 price,
1799 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1800 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1801 ).as_str()),
1802 order.venue_order_id(),
1803 order.account_id(),
1804 );
1805 return;
1806 }
1807
1808 self.generate_order_updated(order, quantity, Some(price), None);
1809 order.set_liquidity_side(LiquiditySide::Taker);
1810 self.fill_limit_order(order);
1811 return;
1812 }
1813 self.generate_order_updated(order, quantity, Some(price), None);
1814 }
1815
1816 fn update_stop_market_order(
1817 &mut self,
1818 order: &mut OrderAny,
1819 quantity: Quantity,
1820 trigger_price: Price,
1821 ) {
1822 if self
1823 .core
1824 .is_stop_matched(order.order_side_specified(), trigger_price)
1825 {
1826 self.generate_order_modify_rejected(
1827 order.trader_id(),
1828 order.strategy_id(),
1829 order.instrument_id(),
1830 order.client_order_id(),
1831 Ustr::from(
1832 format!(
1833 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1834 order.order_type(),
1835 order.order_side(),
1836 trigger_price,
1837 self.core
1838 .bid
1839 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1840 self.core
1841 .ask
1842 .map_or_else(|| "None".to_string(), |p| p.to_string())
1843 )
1844 .as_str(),
1845 ),
1846 order.venue_order_id(),
1847 order.account_id(),
1848 );
1849 return;
1850 }
1851
1852 self.generate_order_updated(order, quantity, None, Some(trigger_price));
1853 }
1854
1855 fn update_stop_limit_order(
1856 &mut self,
1857 order: &mut OrderAny,
1858 quantity: Quantity,
1859 price: Price,
1860 trigger_price: Price,
1861 ) {
1862 if order.is_triggered().is_some_and(|t| t) {
1863 if self
1865 .core
1866 .is_limit_matched(order.order_side_specified(), price)
1867 {
1868 if order.is_post_only() {
1869 self.generate_order_modify_rejected(
1870 order.trader_id(),
1871 order.strategy_id(),
1872 order.instrument_id(),
1873 order.client_order_id(),
1874 Ustr::from(format!(
1875 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1876 order.order_type(),
1877 order.order_side(),
1878 price,
1879 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1880 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1881 ).as_str()),
1882 order.venue_order_id(),
1883 order.account_id(),
1884 );
1885 return;
1886 }
1887 self.generate_order_updated(order, quantity, Some(price), None);
1888 order.set_liquidity_side(LiquiditySide::Taker);
1889 self.fill_limit_order(order);
1890 return; }
1892 } else {
1893 if self
1895 .core
1896 .is_stop_matched(order.order_side_specified(), trigger_price)
1897 {
1898 self.generate_order_modify_rejected(
1899 order.trader_id(),
1900 order.strategy_id(),
1901 order.instrument_id(),
1902 order.client_order_id(),
1903 Ustr::from(
1904 format!(
1905 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1906 order.order_type(),
1907 order.order_side(),
1908 trigger_price,
1909 self.core
1910 .bid
1911 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1912 self.core
1913 .ask
1914 .map_or_else(|| "None".to_string(), |p| p.to_string())
1915 )
1916 .as_str(),
1917 ),
1918 order.venue_order_id(),
1919 order.account_id(),
1920 );
1921 return;
1922 }
1923 }
1924
1925 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
1926 }
1927
1928 fn update_market_if_touched_order(
1929 &mut self,
1930 order: &mut OrderAny,
1931 quantity: Quantity,
1932 trigger_price: Price,
1933 ) {
1934 if self
1935 .core
1936 .is_touch_triggered(order.order_side_specified(), trigger_price)
1937 {
1938 self.generate_order_modify_rejected(
1939 order.trader_id(),
1940 order.strategy_id(),
1941 order.instrument_id(),
1942 order.client_order_id(),
1943 Ustr::from(
1944 format!(
1945 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
1946 order.order_type(),
1947 order.order_side(),
1948 trigger_price,
1949 self.core
1950 .bid
1951 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1952 self.core
1953 .ask
1954 .map_or_else(|| "None".to_string(), |p| p.to_string())
1955 )
1956 .as_str(),
1957 ),
1958 order.venue_order_id(),
1959 order.account_id(),
1960 );
1961 return;
1963 }
1964
1965 self.generate_order_updated(order, quantity, None, Some(trigger_price));
1966 }
1967
1968 fn update_limit_if_touched_order(
1969 &mut self,
1970 order: &mut OrderAny,
1971 quantity: Quantity,
1972 price: Price,
1973 trigger_price: Price,
1974 ) {
1975 if order.is_triggered().is_some_and(|t| t) {
1976 if self
1978 .core
1979 .is_limit_matched(order.order_side_specified(), price)
1980 {
1981 if order.is_post_only() {
1982 self.generate_order_modify_rejected(
1983 order.trader_id(),
1984 order.strategy_id(),
1985 order.instrument_id(),
1986 order.client_order_id(),
1987 Ustr::from(format!(
1988 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1989 order.order_type(),
1990 order.order_side(),
1991 price,
1992 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1993 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1994 ).as_str()),
1995 order.venue_order_id(),
1996 order.account_id(),
1997 );
1998 return;
2000 }
2001 self.generate_order_updated(order, quantity, Some(price), None);
2002 order.set_liquidity_side(LiquiditySide::Taker);
2003 self.fill_limit_order(order);
2004 return;
2005 }
2006 } else {
2007 if self
2009 .core
2010 .is_touch_triggered(order.order_side_specified(), trigger_price)
2011 {
2012 self.generate_order_modify_rejected(
2013 order.trader_id(),
2014 order.strategy_id(),
2015 order.instrument_id(),
2016 order.client_order_id(),
2017 Ustr::from(
2018 format!(
2019 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
2020 order.order_type(),
2021 order.order_side(),
2022 trigger_price,
2023 self.core
2024 .bid
2025 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2026 self.core
2027 .ask
2028 .map_or_else(|| "None".to_string(), |p| p.to_string())
2029 )
2030 .as_str(),
2031 ),
2032 order.venue_order_id(),
2033 order.account_id(),
2034 );
2035 return;
2036 }
2037 }
2038
2039 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
2040 }
2041
2042 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
2043 let (new_trigger_price, new_price) = trailing_stop_calculate(
2044 self.instrument.price_increment(),
2045 order.trigger_price(),
2046 order.activation_price(),
2047 order,
2048 self.core.bid,
2049 self.core.ask,
2050 self.core.last,
2051 )
2052 .unwrap();
2053
2054 if new_trigger_price.is_none() && new_price.is_none() {
2055 return;
2056 }
2057
2058 self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price);
2059 }
2060
2061 fn accept_order(&mut self, order: &mut OrderAny) {
2064 if order.is_closed() {
2065 return;
2067 }
2068 if order.status() != OrderStatus::Accepted {
2069 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2070 self.generate_order_accepted(order, venue_order_id);
2071
2072 if matches!(
2073 order.order_type(),
2074 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
2075 ) && order.trigger_price().is_none()
2076 {
2077 self.update_trailing_stop_order(order);
2078 }
2079 }
2080
2081 let _ = self.core.add_order(order.to_owned().into());
2082 }
2083
2084 fn expire_order(&mut self, order: &PassiveOrderAny) {
2085 if self.config.support_contingent_orders
2086 && order
2087 .contingency_type()
2088 .is_some_and(|c| c != ContingencyType::NoContingency)
2089 {
2090 self.cancel_contingent_orders(&OrderAny::from(order.clone()));
2091 }
2092
2093 self.generate_order_expired(&order.to_any());
2094 }
2095
2096 fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
2097 let cancel_contingencies = cancel_contingencies.unwrap_or(true);
2098 if order.is_active_local() {
2099 log::error!(
2100 "Cannot cancel an order with {} from the matching engine",
2101 order.status()
2102 );
2103 return;
2104 }
2105
2106 if self.core.order_exists(order.client_order_id()) {
2108 let _ = self
2109 .core
2110 .delete_order(&PassiveOrderAny::from(order.clone()));
2111 }
2112 self.cached_filled_qty.remove(&order.client_order_id());
2113
2114 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2115 self.generate_order_canceled(order, venue_order_id);
2116
2117 if self.config.support_contingent_orders
2118 && order.contingency_type().is_some()
2119 && order.contingency_type().unwrap() != ContingencyType::NoContingency
2120 && cancel_contingencies
2121 {
2122 self.cancel_contingent_orders(order);
2123 }
2124 }
2125
2126 fn update_order(
2127 &mut self,
2128 order: &mut OrderAny,
2129 quantity: Option<Quantity>,
2130 price: Option<Price>,
2131 trigger_price: Option<Price>,
2132 update_contingencies: Option<bool>,
2133 ) {
2134 let update_contingencies = update_contingencies.unwrap_or(true);
2135 let quantity = quantity.unwrap_or(order.quantity());
2136
2137 match order {
2138 OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
2139 let price = price.unwrap_or(order.price().unwrap());
2140 self.update_limit_order(order, quantity, price);
2141 }
2142 OrderAny::StopMarket(_) => {
2143 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2144 self.update_stop_market_order(order, quantity, trigger_price);
2145 }
2146 OrderAny::StopLimit(_) => {
2147 let price = price.unwrap_or(order.price().unwrap());
2148 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2149 self.update_stop_limit_order(order, quantity, price, trigger_price);
2150 }
2151 OrderAny::MarketIfTouched(_) => {
2152 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2153 self.update_market_if_touched_order(order, quantity, trigger_price);
2154 }
2155 OrderAny::LimitIfTouched(_) => {
2156 let price = price.unwrap_or(order.price().unwrap());
2157 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2158 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2159 }
2160 OrderAny::TrailingStopMarket(_) => {
2161 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2162 self.update_market_if_touched_order(order, quantity, trigger_price);
2163 }
2164 OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
2165 let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
2166 let trigger_price =
2167 trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
2168 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2169 }
2170 _ => {
2171 panic!(
2172 "Unsupported order type {} for update_order",
2173 order.order_type()
2174 );
2175 }
2176 }
2177
2178 if self.config.support_contingent_orders
2179 && order
2180 .contingency_type()
2181 .is_some_and(|c| c != ContingencyType::NoContingency)
2182 && update_contingencies
2183 {
2184 self.update_contingent_order(order);
2185 }
2186 }
2187
2188 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
2189 todo!("trigger_stop_order")
2190 }
2191
2192 fn update_contingent_order(&mut self, order: &OrderAny) {
2193 log::debug!("Updating OUO orders from {}", order.client_order_id());
2194 if let Some(linked_order_ids) = order.linked_order_ids() {
2195 for client_order_id in linked_order_ids {
2196 let mut child_order = match self.cache.borrow().order(client_order_id) {
2197 Some(order) => order.clone(),
2198 None => panic!("Order {client_order_id} not found in cache."),
2199 };
2200
2201 if child_order.is_active_local() {
2202 continue;
2203 }
2204
2205 if order.leaves_qty().is_zero() {
2206 self.cancel_order(&child_order, None);
2207 } else if child_order.leaves_qty() != order.leaves_qty() {
2208 let price = child_order.price();
2209 let trigger_price = child_order.trigger_price();
2210 self.update_order(
2211 &mut child_order,
2212 Some(order.leaves_qty()),
2213 price,
2214 trigger_price,
2215 Some(false),
2216 );
2217 }
2218 }
2219 }
2220 }
2221
2222 fn cancel_contingent_orders(&mut self, order: &OrderAny) {
2223 if let Some(linked_order_ids) = order.linked_order_ids() {
2224 for client_order_id in linked_order_ids {
2225 let contingent_order = match self.cache.borrow().order(client_order_id) {
2226 Some(order) => order.clone(),
2227 None => panic!("Cannot find contingent order for {client_order_id}"),
2228 };
2229 if contingent_order.is_active_local() {
2230 continue;
2232 }
2233 if !contingent_order.is_closed() {
2234 self.cancel_order(&contingent_order, Some(false));
2235 }
2236 }
2237 }
2238 }
2239
2240 fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
2243 let ts_now = self.clock.borrow().timestamp_ns();
2244 let account_id = order
2245 .account_id()
2246 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2247
2248 let event = OrderEventAny::Rejected(OrderRejected::new(
2249 order.trader_id(),
2250 order.strategy_id(),
2251 order.instrument_id(),
2252 order.client_order_id(),
2253 account_id,
2254 reason,
2255 UUID4::new(),
2256 ts_now,
2257 ts_now,
2258 false,
2259 ));
2260 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2261 }
2262
2263 fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
2264 let ts_now = self.clock.borrow().timestamp_ns();
2265 let account_id = order
2266 .account_id()
2267 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2268 let event = OrderEventAny::Accepted(OrderAccepted::new(
2269 order.trader_id(),
2270 order.strategy_id(),
2271 order.instrument_id(),
2272 order.client_order_id(),
2273 venue_order_id,
2274 account_id,
2275 UUID4::new(),
2276 ts_now,
2277 ts_now,
2278 false,
2279 ));
2280 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2281
2282 order.apply(event).expect("Failed to apply order event");
2284 }
2285
2286 #[allow(clippy::too_many_arguments)]
2287 fn generate_order_modify_rejected(
2288 &self,
2289 trader_id: TraderId,
2290 strategy_id: StrategyId,
2291 instrument_id: InstrumentId,
2292 client_order_id: ClientOrderId,
2293 reason: Ustr,
2294 venue_order_id: Option<VenueOrderId>,
2295 account_id: Option<AccountId>,
2296 ) {
2297 let ts_now = self.clock.borrow().timestamp_ns();
2298 let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
2299 trader_id,
2300 strategy_id,
2301 instrument_id,
2302 client_order_id,
2303 reason,
2304 UUID4::new(),
2305 ts_now,
2306 ts_now,
2307 false,
2308 venue_order_id,
2309 account_id,
2310 ));
2311 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2312 }
2313
2314 #[allow(clippy::too_many_arguments)]
2315 fn generate_order_cancel_rejected(
2316 &self,
2317 trader_id: TraderId,
2318 strategy_id: StrategyId,
2319 account_id: AccountId,
2320 instrument_id: InstrumentId,
2321 client_order_id: ClientOrderId,
2322 venue_order_id: VenueOrderId,
2323 reason: Ustr,
2324 ) {
2325 let ts_now = self.clock.borrow().timestamp_ns();
2326 let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
2327 trader_id,
2328 strategy_id,
2329 instrument_id,
2330 client_order_id,
2331 reason,
2332 UUID4::new(),
2333 ts_now,
2334 ts_now,
2335 false,
2336 Some(venue_order_id),
2337 Some(account_id),
2338 ));
2339 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2340 }
2341
2342 fn generate_order_updated(
2343 &self,
2344 order: &mut OrderAny,
2345 quantity: Quantity,
2346 price: Option<Price>,
2347 trigger_price: Option<Price>,
2348 ) {
2349 let ts_now = self.clock.borrow().timestamp_ns();
2350 let event = OrderEventAny::Updated(OrderUpdated::new(
2351 order.trader_id(),
2352 order.strategy_id(),
2353 order.instrument_id(),
2354 order.client_order_id(),
2355 quantity,
2356 UUID4::new(),
2357 ts_now,
2358 ts_now,
2359 false,
2360 order.venue_order_id(),
2361 order.account_id(),
2362 price,
2363 trigger_price,
2364 ));
2365 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2366
2367 order.apply(event).expect("Failed to apply order event");
2369 }
2370
2371 fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
2372 let ts_now = self.clock.borrow().timestamp_ns();
2373 let event = OrderEventAny::Canceled(OrderCanceled::new(
2374 order.trader_id(),
2375 order.strategy_id(),
2376 order.instrument_id(),
2377 order.client_order_id(),
2378 UUID4::new(),
2379 ts_now,
2380 ts_now,
2381 false,
2382 Some(venue_order_id),
2383 order.account_id(),
2384 ));
2385 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2386 }
2387
2388 fn generate_order_triggered(&self, order: &OrderAny) {
2389 let ts_now = self.clock.borrow().timestamp_ns();
2390 let event = OrderEventAny::Triggered(OrderTriggered::new(
2391 order.trader_id(),
2392 order.strategy_id(),
2393 order.instrument_id(),
2394 order.client_order_id(),
2395 UUID4::new(),
2396 ts_now,
2397 ts_now,
2398 false,
2399 order.venue_order_id(),
2400 order.account_id(),
2401 ));
2402 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2403 }
2404
2405 fn generate_order_expired(&self, order: &OrderAny) {
2406 let ts_now = self.clock.borrow().timestamp_ns();
2407 let event = OrderEventAny::Expired(OrderExpired::new(
2408 order.trader_id(),
2409 order.strategy_id(),
2410 order.instrument_id(),
2411 order.client_order_id(),
2412 UUID4::new(),
2413 ts_now,
2414 ts_now,
2415 false,
2416 order.venue_order_id(),
2417 order.account_id(),
2418 ));
2419 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2420 }
2421
2422 #[allow(clippy::too_many_arguments)]
2423 fn generate_order_filled(
2424 &mut self,
2425 order: &mut OrderAny,
2426 venue_order_id: VenueOrderId,
2427 venue_position_id: Option<PositionId>,
2428 last_qty: Quantity,
2429 last_px: Price,
2430 quote_currency: Currency,
2431 commission: Money,
2432 liquidity_side: LiquiditySide,
2433 ) {
2434 let ts_now = self.clock.borrow().timestamp_ns();
2435 let account_id = order
2436 .account_id()
2437 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2438 let event = OrderEventAny::Filled(OrderFilled::new(
2439 order.trader_id(),
2440 order.strategy_id(),
2441 order.instrument_id(),
2442 order.client_order_id(),
2443 venue_order_id,
2444 account_id,
2445 self.ids_generator.generate_trade_id(),
2446 order.order_side(),
2447 order.order_type(),
2448 last_qty,
2449 last_px,
2450 quote_currency,
2451 liquidity_side,
2452 UUID4::new(),
2453 ts_now,
2454 ts_now,
2455 false,
2456 venue_position_id,
2457 Some(commission),
2458 ));
2459 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2460
2461 order.apply(event).expect("Failed to apply order event");
2463 }
2464}