1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::RefCell,
23 cmp::min,
24 collections::HashMap,
25 ops::{Add, Sub},
26 rc::Rc,
27};
28
29use chrono::TimeDelta;
30use nautilus_common::{
31 cache::Cache,
32 clock::Clock,
33 msgbus::{self},
34};
35use nautilus_core::{UUID4, UnixNanos};
36use nautilus_model::{
37 data::{Bar, BarType, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, order::BookOrder},
38 enums::{
39 AccountType, AggregationSource, AggressorSide, BarAggregation, BookType, ContingencyType,
40 LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide, OrderSideSpecified,
41 OrderStatus, OrderType, PriceType, TimeInForce,
42 },
43 events::{
44 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
45 OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
46 },
47 identifiers::{
48 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
49 VenueOrderId,
50 },
51 instruments::{EXPIRING_INSTRUMENT_TYPES, Instrument, InstrumentAny},
52 orderbook::OrderBook,
53 orders::{Order, OrderAny, PassiveOrderAny, StopOrderAny},
54 position::Position,
55 types::{Currency, Money, Price, Quantity, fixed::FIXED_PRECISION},
56};
57use ustr::Ustr;
58
59use crate::{
60 matching_core::OrderMatchingCore,
61 matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
62 messages::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
63 models::{
64 fee::{FeeModel, FeeModelAny},
65 fill::FillModel,
66 },
67 trailing::trailing_stop_calculate,
68};
69
70pub struct OrderMatchingEngine {
72 pub venue: Venue,
74 pub instrument: InstrumentAny,
76 pub raw_id: u32,
78 pub book_type: BookType,
80 pub oms_type: OmsType,
82 pub account_type: AccountType,
84 pub market_status: MarketStatus,
86 pub config: OrderMatchingEngineConfig,
88 clock: Rc<RefCell<dyn Clock>>,
89 cache: Rc<RefCell<Cache>>,
90 book: OrderBook,
91 pub core: OrderMatchingCore,
92 fill_model: FillModel,
93 fee_model: FeeModelAny,
94 target_bid: Option<Price>,
95 target_ask: Option<Price>,
96 target_last: Option<Price>,
97 last_bar_bid: Option<Bar>,
98 last_bar_ask: Option<Bar>,
99 execution_bar_types: HashMap<InstrumentId, BarType>,
100 execution_bar_deltas: HashMap<BarType, TimeDelta>,
101 account_ids: HashMap<TraderId, AccountId>,
102 cached_filled_qty: HashMap<ClientOrderId, Quantity>,
103 ids_generator: IdsGenerator,
104}
105
106impl OrderMatchingEngine {
107 #[allow(clippy::too_many_arguments)]
108 pub fn new(
109 instrument: InstrumentAny,
110 raw_id: u32,
111 fill_model: FillModel,
112 fee_model: FeeModelAny,
113 book_type: BookType,
114 oms_type: OmsType,
115 account_type: AccountType,
116 clock: Rc<RefCell<dyn Clock>>,
117 cache: Rc<RefCell<Cache>>,
118 config: OrderMatchingEngineConfig,
119 ) -> Self {
120 let book = OrderBook::new(instrument.id(), book_type);
121 let core = OrderMatchingCore::new(
122 instrument.id(),
123 instrument.price_increment(),
124 None, None, None, );
128 let ids_generator = IdsGenerator::new(
129 instrument.id().venue,
130 oms_type,
131 raw_id,
132 config.use_random_ids,
133 config.use_position_ids,
134 cache.clone(),
135 );
136
137 Self {
138 venue: instrument.id().venue,
139 instrument,
140 raw_id,
141 fill_model,
142 fee_model,
143 book_type,
144 oms_type,
145 account_type,
146 clock,
147 cache,
148 book,
149 core,
150 market_status: MarketStatus::Open,
151 config,
152 target_bid: None,
153 target_ask: None,
154 target_last: None,
155 last_bar_bid: None,
156 last_bar_ask: None,
157 execution_bar_types: HashMap::new(),
158 execution_bar_deltas: HashMap::new(),
159 account_ids: HashMap::new(),
160 cached_filled_qty: HashMap::new(),
161 ids_generator,
162 }
163 }
164
165 pub fn reset(&mut self) {
166 self.book.clear(0, UnixNanos::default());
167 self.execution_bar_types.clear();
168 self.execution_bar_deltas.clear();
169 self.account_ids.clear();
170 self.cached_filled_qty.clear();
171 self.core.reset();
172 self.target_bid = None;
173 self.target_ask = None;
174 self.target_last = None;
175 self.ids_generator.reset();
176
177 log::info!("Reset {}", self.instrument.id());
178 }
179
180 pub const fn set_fill_model(&mut self, fill_model: FillModel) {
181 self.fill_model = fill_model;
182 }
183
184 #[must_use]
185 pub fn best_bid_price(&self) -> Option<Price> {
186 self.book.best_bid_price()
187 }
188
189 #[must_use]
190 pub fn best_ask_price(&self) -> Option<Price> {
191 self.book.best_ask_price()
192 }
193
194 #[must_use]
195 pub const fn get_book(&self) -> &OrderBook {
196 &self.book
197 }
198
199 #[must_use]
200 pub fn get_open_bid_orders(&self) -> &[PassiveOrderAny] {
201 self.core.get_orders_bid()
202 }
203
204 #[must_use]
205 pub fn get_open_ask_orders(&self) -> &[PassiveOrderAny] {
206 self.core.get_orders_ask()
207 }
208
209 #[must_use]
210 pub fn get_open_orders(&self) -> Vec<PassiveOrderAny> {
211 let mut orders = Vec::new();
213 orders.extend_from_slice(self.core.get_orders_bid());
214 orders.extend_from_slice(self.core.get_orders_ask());
215 orders
216 }
217
218 #[must_use]
219 pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
220 self.core.order_exists(client_order_id)
221 }
222
223 pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) {
227 log::debug!("Processing {delta}");
228
229 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
230 self.book.apply_delta(delta);
231 }
232
233 self.iterate(delta.ts_event);
234 }
235
236 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) {
237 log::debug!("Processing {deltas}");
238
239 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
240 self.book.apply_deltas(deltas);
241 }
242
243 self.iterate(deltas.ts_event);
244 }
245
246 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
247 log::debug!("Processing {quote}");
248
249 if self.book_type == BookType::L1_MBP {
250 self.book.update_quote_tick(quote).unwrap();
251 }
252
253 self.iterate(quote.ts_event);
254 }
255
256 pub fn process_bar(&mut self, bar: &Bar) {
257 log::debug!("Processing {bar}");
258
259 if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
261 return;
262 }
263
264 let bar_type = bar.bar_type;
265 if bar_type.aggregation_source() == AggregationSource::Internal {
267 return;
268 }
269
270 if bar_type.spec().aggregation == BarAggregation::Month {
272 return;
273 }
274
275 let execution_bar_type =
276 if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
277 execution_bar_type.to_owned()
278 } else {
279 self.execution_bar_types
280 .insert(bar.instrument_id(), bar_type);
281 self.execution_bar_deltas
282 .insert(bar_type, bar_type.spec().timedelta());
283 bar_type
284 };
285
286 if execution_bar_type != bar_type {
287 let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
288 if bar_type_timedelta.is_none() {
289 bar_type_timedelta = Some(bar_type.spec().timedelta());
290 self.execution_bar_deltas
291 .insert(bar_type, bar_type_timedelta.unwrap());
292 }
293 if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
294 >= &bar_type_timedelta.unwrap()
295 {
296 self.execution_bar_types
297 .insert(bar_type.instrument_id(), bar_type);
298 } else {
299 return;
300 }
301 }
302
303 match bar_type.spec().price_type {
304 PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
305 PriceType::Bid => {
306 self.last_bar_bid = Some(bar.to_owned());
307 self.process_quote_ticks_from_bar(bar);
308 }
309 PriceType::Ask => {
310 self.last_bar_ask = Some(bar.to_owned());
311 self.process_quote_ticks_from_bar(bar);
312 }
313 PriceType::Mark => panic!("Not implemented"),
314 }
315 }
316
317 fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
318 let size = Quantity::new(bar.volume.as_f64() / 4.0, bar.volume.precision);
320 let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
321 {
322 AggressorSide::Buyer
323 } else {
324 AggressorSide::Seller
325 };
326
327 let mut trade_tick = TradeTick::new(
329 bar.instrument_id(),
330 bar.open,
331 size,
332 aggressor_side,
333 self.ids_generator.generate_trade_id(),
334 bar.ts_event,
335 bar.ts_event,
336 );
337
338 if !self.core.is_last_initialized {
341 self.book.update_trade_tick(&trade_tick).unwrap();
342 self.iterate(trade_tick.ts_init);
343 self.core.set_last_raw(trade_tick.price);
344 }
345
346 if self.core.last.is_some_and(|last| bar.high > last) {
349 trade_tick.price = bar.high;
350 trade_tick.aggressor_side = AggressorSide::Buyer;
351 trade_tick.trade_id = self.ids_generator.generate_trade_id();
352
353 self.book.update_trade_tick(&trade_tick).unwrap();
354 self.iterate(trade_tick.ts_init);
355
356 self.core.set_last_raw(trade_tick.price);
357 }
358
359 if self.core.last.is_some_and(|last| bar.low < last) {
363 trade_tick.price = bar.low;
364 trade_tick.aggressor_side = AggressorSide::Seller;
365 trade_tick.trade_id = self.ids_generator.generate_trade_id();
366
367 self.book.update_trade_tick(&trade_tick).unwrap();
368 self.iterate(trade_tick.ts_init);
369
370 self.core.set_last_raw(trade_tick.price);
371 }
372
373 if self.core.last.is_some_and(|last| bar.close != last) {
378 trade_tick.price = bar.close;
379 trade_tick.aggressor_side = if bar.close > self.core.last.unwrap() {
380 AggressorSide::Buyer
381 } else {
382 AggressorSide::Seller
383 };
384 trade_tick.trade_id = self.ids_generator.generate_trade_id();
385
386 self.book.update_trade_tick(&trade_tick).unwrap();
387 self.iterate(trade_tick.ts_init);
388
389 self.core.set_last_raw(trade_tick.price);
390 }
391 }
392
393 fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
394 if self.last_bar_bid.is_none()
396 || self.last_bar_ask.is_none()
397 || self.last_bar_bid.unwrap().ts_event != self.last_bar_ask.unwrap().ts_event
398 {
399 return;
400 }
401 let bid_bar = self.last_bar_bid.unwrap();
402 let ask_bar = self.last_bar_ask.unwrap();
403 let bid_size = Quantity::new(bid_bar.volume.as_f64() / 4.0, bar.volume.precision);
404 let ask_size = Quantity::new(ask_bar.volume.as_f64() / 4.0, bar.volume.precision);
405
406 let mut quote_tick = QuoteTick::new(
408 self.book.instrument_id,
409 bid_bar.open,
410 ask_bar.open,
411 bid_size,
412 ask_size,
413 bid_bar.ts_init,
414 bid_bar.ts_init,
415 );
416
417 self.book.update_quote_tick("e_tick).unwrap();
419 self.iterate(quote_tick.ts_init);
420
421 quote_tick.bid_price = bid_bar.high;
423 quote_tick.ask_price = ask_bar.high;
424 self.book.update_quote_tick("e_tick).unwrap();
425 self.iterate(quote_tick.ts_init);
426
427 quote_tick.bid_price = bid_bar.low;
429 quote_tick.ask_price = ask_bar.low;
430 self.book.update_quote_tick("e_tick).unwrap();
431 self.iterate(quote_tick.ts_init);
432
433 quote_tick.bid_price = bid_bar.close;
435 quote_tick.ask_price = ask_bar.close;
436 self.book.update_quote_tick("e_tick).unwrap();
437 self.iterate(quote_tick.ts_init);
438
439 self.last_bar_bid = None;
441 self.last_bar_ask = None;
442 }
443
444 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
445 log::debug!("Processing {trade}");
446
447 if self.book_type == BookType::L1_MBP {
448 self.book.update_trade_tick(trade).unwrap();
449 }
450 self.core.set_last_raw(trade.price);
451
452 self.iterate(trade.ts_event);
453 }
454
455 pub fn process_status(&mut self, action: MarketStatusAction) {
456 log::debug!("Processing {action}");
457
458 if self.market_status == MarketStatus::Closed
460 && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
461 {
462 self.market_status = MarketStatus::Open;
463 }
464 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
466 self.market_status = MarketStatus::Paused;
467 }
468 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
470 self.market_status = MarketStatus::Suspended;
471 }
472 if self.market_status == MarketStatus::Open
474 && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
475 {
476 self.market_status = MarketStatus::Closed;
477 }
478 }
479
480 #[allow(clippy::needless_return)]
483 pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
484 {
486 let cache_borrow = self.cache.as_ref().borrow();
487
488 if self.core.order_exists(order.client_order_id()) {
489 self.generate_order_rejected(order, "Order already exists".into());
490 return;
491 }
492
493 self.account_ids.insert(order.trader_id(), account_id);
495
496 if EXPIRING_INSTRUMENT_TYPES.contains(&self.instrument.instrument_class()) {
498 if let Some(activation_ns) = self.instrument.activation_ns() {
499 if self.clock.borrow().timestamp_ns() < activation_ns {
500 self.generate_order_rejected(
501 order,
502 format!(
503 "Contract {} is not yet active, activation {}",
504 self.instrument.id(),
505 self.instrument.activation_ns().unwrap()
506 )
507 .into(),
508 );
509 return;
510 }
511 }
512 if let Some(expiration_ns) = self.instrument.expiration_ns() {
513 if self.clock.borrow().timestamp_ns() >= expiration_ns {
514 self.generate_order_rejected(
515 order,
516 format!(
517 "Contract {} has expired, expiration {}",
518 self.instrument.id(),
519 self.instrument.expiration_ns().unwrap()
520 )
521 .into(),
522 );
523 return;
524 }
525 }
526 }
527
528 if self.config.support_contingent_orders {
530 if let Some(parent_order_id) = order.parent_order_id() {
531 let parent_order = cache_borrow.order(&parent_order_id);
532 if parent_order.is_none()
533 || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
534 {
535 panic!("OTO parent not found");
536 }
537 if let Some(parent_order) = parent_order {
538 let parent_order_status = parent_order.status();
539 let order_is_open = order.is_open();
540 if parent_order.status() == OrderStatus::Rejected && order.is_open() {
541 self.generate_order_rejected(
542 order,
543 format!("Rejected OTO order from {parent_order_id}").into(),
544 );
545 return;
546 } else if parent_order.status() == OrderStatus::Accepted
547 && parent_order.status() == OrderStatus::Triggered
548 {
549 log::info!(
550 "Pending OTO order {} triggers from {parent_order_id}",
551 order.client_order_id(),
552 );
553 return;
554 }
555 }
556 }
557
558 if let Some(linked_order_ids) = order.linked_order_ids() {
559 for client_order_id in linked_order_ids {
560 match cache_borrow.order(client_order_id) {
561 Some(contingent_order)
562 if (order.contingency_type().unwrap() == ContingencyType::Oco
563 || order.contingency_type().unwrap()
564 == ContingencyType::Ouo)
565 && !order.is_closed()
566 && contingent_order.is_closed() =>
567 {
568 self.generate_order_rejected(
569 order,
570 format!("Contingent order {client_order_id} already closed")
571 .into(),
572 );
573 return;
574 }
575 None => panic!("Cannot find contingent order for {client_order_id}"),
576 _ => {}
577 }
578 }
579 }
580 }
581
582 if order.quantity().precision != self.instrument.size_precision() {
584 self.generate_order_rejected(
585 order,
586 format!(
587 "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
588 order.client_order_id(),
589 order.quantity().precision,
590 self.instrument.id(),
591 self.instrument.size_precision()
592 )
593 .into(),
594 );
595 return;
596 }
597
598 if let Some(price) = order.price() {
600 if price.precision != self.instrument.price_precision() {
601 self.generate_order_rejected(
602 order,
603 format!(
604 "Invalid order price precision for order {}, was {} when {} price precision is {}",
605 order.client_order_id(),
606 price.precision,
607 self.instrument.id(),
608 self.instrument.price_precision()
609 )
610 .into(),
611 );
612 return;
613 }
614 }
615
616 if let Some(trigger_price) = order.trigger_price() {
618 if trigger_price.precision != self.instrument.price_precision() {
619 self.generate_order_rejected(
620 order,
621 format!(
622 "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
623 order.client_order_id(),
624 trigger_price.precision,
625 self.instrument.id(),
626 self.instrument.price_precision()
627 )
628 .into(),
629 );
630 return;
631 }
632 }
633
634 let position: Option<&Position> = cache_borrow
636 .position_for_order(&order.client_order_id())
637 .or_else(|| {
638 if self.oms_type == OmsType::Netting {
639 let position_id = PositionId::new(
640 format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
641 );
642 cache_borrow.position(&position_id)
643 } else {
644 None
645 }
646 });
647
648 if order.order_side() == OrderSide::Sell
650 && self.account_type != AccountType::Margin
651 && matches!(self.instrument, InstrumentAny::Equity(_))
652 && (position.is_none()
653 || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
654 {
655 let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
656 self.generate_order_rejected(
657 order,
658 format!(
659 "Short selling not permitted on a CASH account with position {position_string} and order {order}",
660 )
661 .into(),
662 );
663 return;
664 }
665
666 if self.config.use_reduce_only
668 && order.is_reduce_only()
669 && !order.is_closed()
670 && position.is_none_or(|pos| {
671 pos.is_closed()
672 || (order.is_buy() && pos.is_long())
673 || (order.is_sell() && pos.is_short())
674 })
675 {
676 self.generate_order_rejected(
677 order,
678 format!(
679 "Reduce-only order {} ({}-{}) would have increased position",
680 order.client_order_id(),
681 order.order_type().to_string().to_uppercase(),
682 order.order_side().to_string().to_uppercase()
683 )
684 .into(),
685 );
686 return;
687 }
688 }
689
690 match order.order_type() {
691 OrderType::Market => self.process_market_order(order),
692 OrderType::Limit => self.process_limit_order(order),
693 OrderType::MarketToLimit => self.process_market_to_limit_order(order),
694 OrderType::StopMarket => self.process_stop_market_order(order),
695 OrderType::StopLimit => self.process_stop_limit_order(order),
696 OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
697 OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
698 OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
699 OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
700 }
701 }
702
703 pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
704 if let Some(order) = self.core.get_order(command.client_order_id) {
705 self.update_order(
706 &mut order.to_any(),
707 command.quantity,
708 command.price,
709 command.trigger_price,
710 None,
711 );
712 } else {
713 self.generate_order_modify_rejected(
714 command.trader_id,
715 command.strategy_id,
716 command.instrument_id,
717 command.client_order_id,
718 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
719 Some(command.venue_order_id),
720 Some(account_id),
721 );
722 }
723 }
724
725 pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
726 match self.core.get_order(command.client_order_id) {
727 Some(passive_order) => {
728 if passive_order.is_inflight() || passive_order.is_open() {
729 self.cancel_order(&OrderAny::from(passive_order.to_owned()), None);
730 }
731 }
732 None => self.generate_order_cancel_rejected(
733 command.trader_id,
734 command.strategy_id,
735 account_id,
736 command.instrument_id,
737 command.client_order_id,
738 command.venue_order_id,
739 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
740 ),
741 }
742 }
743
744 pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
745 let open_orders = self
746 .cache
747 .borrow()
748 .orders_open(None, Some(&command.instrument_id), None, None)
749 .into_iter()
750 .cloned()
751 .collect::<Vec<OrderAny>>();
752 for order in open_orders {
753 if command.order_side != OrderSide::NoOrderSide
754 && command.order_side != order.order_side()
755 {
756 continue;
757 }
758 if order.is_inflight() || order.is_open() {
759 self.cancel_order(&order, None);
760 }
761 }
762 }
763
764 pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
765 for order in &command.cancels {
766 self.process_cancel(order, account_id);
767 }
768 }
769
770 fn process_market_order(&mut self, order: &mut OrderAny) {
771 if order.time_in_force() == TimeInForce::AtTheOpen
772 || order.time_in_force() == TimeInForce::AtTheClose
773 {
774 log::error!(
775 "Market auction for the time in force {} is currently not supported",
776 order.time_in_force()
777 );
778 return;
779 }
780
781 let order_side = order.order_side();
783 let is_ask_initialized = self.core.is_ask_initialized;
784 let is_bid_initialized = self.core.is_bid_initialized;
785 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
786 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
787 {
788 self.generate_order_rejected(
789 order,
790 format!("No market for {}", order.instrument_id()).into(),
791 );
792 return;
793 }
794
795 self.fill_market_order(order);
796 }
797
798 fn process_limit_order(&mut self, order: &mut OrderAny) {
799 let limit_px = order.price().expect("Limit order must have a price");
800 if order.is_post_only()
801 && self
802 .core
803 .is_limit_matched(order.order_side_specified(), limit_px)
804 {
805 self.generate_order_rejected(
806 order,
807 format!(
808 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
809 order.order_type(),
810 order.order_side(),
811 order.price().unwrap(),
812 self.core
813 .bid
814 .map_or_else(|| "None".to_string(), |p| p.to_string()),
815 self.core
816 .ask
817 .map_or_else(|| "None".to_string(), |p| p.to_string())
818 )
819 .into(),
820 );
821 return;
822 }
823
824 self.accept_order(order);
826
827 if self
829 .core
830 .is_limit_matched(order.order_side_specified(), limit_px)
831 {
832 if order.liquidity_side().is_some()
834 && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
835 {
836 order.set_liquidity_side(LiquiditySide::Taker);
837 }
838 self.fill_limit_order(order);
839 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
840 self.cancel_order(order, None);
841 }
842 }
843
844 fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
845 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
847 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
848 {
849 self.generate_order_rejected(
850 order,
851 format!("No market for {}", order.instrument_id()).into(),
852 );
853 return;
854 }
855
856 self.fill_market_order(order);
858
859 if order.is_open() {
860 self.accept_order(order);
861 }
862 }
863
864 fn process_stop_market_order(&mut self, order: &mut OrderAny) {
865 let stop_px = order
866 .trigger_price()
867 .expect("Stop order must have a trigger price");
868 if self
869 .core
870 .is_stop_matched(order.order_side_specified(), stop_px)
871 {
872 if self.config.reject_stop_orders {
873 self.generate_order_rejected(
874 order,
875 format!(
876 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
877 order.order_type(),
878 order.order_side(),
879 order.trigger_price().unwrap(),
880 self.core
881 .bid
882 .map_or_else(|| "None".to_string(), |p| p.to_string()),
883 self.core
884 .ask
885 .map_or_else(|| "None".to_string(), |p| p.to_string())
886 ).into(),
887 );
888 return;
889 }
890 self.fill_market_order(order);
891 return;
892 }
893
894 self.accept_order(order);
896 }
897
898 fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
899 let stop_px = order
900 .trigger_price()
901 .expect("Stop order must have a trigger price");
902 if self
903 .core
904 .is_stop_matched(order.order_side_specified(), stop_px)
905 {
906 if self.config.reject_stop_orders {
907 self.generate_order_rejected(
908 order,
909 format!(
910 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
911 order.order_type(),
912 order.order_side(),
913 order.trigger_price().unwrap(),
914 self.core
915 .bid
916 .map_or_else(|| "None".to_string(), |p| p.to_string()),
917 self.core
918 .ask
919 .map_or_else(|| "None".to_string(), |p| p.to_string())
920 ).into(),
921 );
922 return;
923 }
924
925 self.accept_order(order);
926 self.generate_order_triggered(order);
927
928 let limit_px = order.price().expect("Stop limit order must have a price");
930 if self
931 .core
932 .is_limit_matched(order.order_side_specified(), limit_px)
933 {
934 order.set_liquidity_side(LiquiditySide::Taker);
935 self.fill_limit_order(order);
936 }
937 }
938
939 self.accept_order(order);
941 }
942
943 fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
944 if self
945 .core
946 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
947 {
948 if self.config.reject_stop_orders {
949 self.generate_order_rejected(
950 order,
951 format!(
952 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
953 order.order_type(),
954 order.order_side(),
955 order.trigger_price().unwrap(),
956 self.core
957 .bid
958 .map_or_else(|| "None".to_string(), |p| p.to_string()),
959 self.core
960 .ask
961 .map_or_else(|| "None".to_string(), |p| p.to_string())
962 ).into(),
963 );
964 return;
965 }
966 self.fill_market_order(order);
967 return;
968 }
969
970 self.accept_order(order);
972 }
973
974 fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
975 if self
976 .core
977 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
978 {
979 if self.config.reject_stop_orders {
980 self.generate_order_rejected(
981 order,
982 format!(
983 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
984 order.order_type(),
985 order.order_side(),
986 order.trigger_price().unwrap(),
987 self.core
988 .bid
989 .map_or_else(|| "None".to_string(), |p| p.to_string()),
990 self.core
991 .ask
992 .map_or_else(|| "None".to_string(), |p| p.to_string())
993 ).into(),
994 );
995 return;
996 }
997 self.accept_order(order);
998 self.generate_order_triggered(order);
999
1000 if self
1002 .core
1003 .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1004 {
1005 order.set_liquidity_side(LiquiditySide::Taker);
1006 self.fill_limit_order(order);
1007 }
1008 return;
1009 }
1010
1011 self.accept_order(order);
1013 }
1014
1015 fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1016 if let Some(trigger_price) = order.trigger_price() {
1017 if self
1018 .core
1019 .is_stop_matched(order.order_side_specified(), trigger_price)
1020 {
1021 self.generate_order_rejected(
1022 order,
1023 format!(
1024 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1025 order.order_type(),
1026 order.order_side(),
1027 trigger_price,
1028 self.core
1029 .bid
1030 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1031 self.core
1032 .ask
1033 .map_or_else(|| "None".to_string(), |p| p.to_string())
1034 ).into(),
1035 );
1036 return;
1037 }
1038 }
1039
1040 self.accept_order(order);
1042 }
1043
1044 pub fn iterate(&mut self, timestamp_ns: UnixNanos) {
1049 if self.book.has_bid() {
1053 self.core.set_bid_raw(self.book.best_bid_price().unwrap());
1054 }
1055 if self.book.has_ask() {
1056 self.core.set_ask_raw(self.book.best_ask_price().unwrap());
1057 }
1058 self.core.iterate();
1059
1060 self.core.bid = self.book.best_bid_price();
1061 self.core.ask = self.book.best_ask_price();
1062
1063 let orders_bid = self.core.get_orders_bid().to_vec();
1064 let orders_ask = self.core.get_orders_ask().to_vec();
1065
1066 self.iterate_orders(timestamp_ns, &orders_bid);
1067 self.iterate_orders(timestamp_ns, &orders_ask);
1068 }
1069
1070 fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[PassiveOrderAny]) {
1071 for order in orders {
1072 if order.is_closed() {
1073 continue;
1074 }
1075
1076 if self.config.support_gtd_orders {
1078 if let Some(expire_time) = order.expire_time() {
1079 if timestamp_ns >= expire_time {
1080 self.core.delete_order(order).unwrap();
1082 self.cached_filled_qty.remove(&order.client_order_id());
1083 self.expire_order(order);
1084 }
1085 }
1086 }
1087
1088 if let PassiveOrderAny::Stop(o) = order {
1090 if let PassiveOrderAny::Stop(
1091 StopOrderAny::TrailingStopMarket(_) | StopOrderAny::TrailingStopLimit(_),
1092 ) = order
1093 {
1094 let mut order = OrderAny::from(o.to_owned());
1095 self.update_trailing_stop_order(&mut order);
1096 }
1097 }
1098
1099 if let Some(target_bid) = self.target_bid {
1101 self.core.bid = Some(target_bid);
1102 self.target_bid = None;
1103 }
1104 if let Some(target_ask) = self.target_ask {
1105 self.core.ask = Some(target_ask);
1106 self.target_ask = None;
1107 }
1108 if let Some(target_last) = self.target_last {
1109 self.core.last = Some(target_last);
1110 self.target_last = None;
1111 }
1112 }
1113
1114 self.target_bid = None;
1116 self.target_ask = None;
1117 self.target_last = None;
1118 }
1119
1120 fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1121 match order.price() {
1122 Some(order_price) => {
1123 let book_order =
1125 BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
1126
1127 let mut fills = self.book.simulate_fills(&book_order);
1128
1129 if fills.is_empty() {
1131 return fills;
1132 }
1133
1134 if let Some(triggered_price) = order.trigger_price() {
1136 if order
1138 .liquidity_side()
1139 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
1140 {
1141 if order.order_side() == OrderSide::Sell && order_price > triggered_price {
1142 let first_fill = fills.first().unwrap();
1144 let triggered_qty = first_fill.1;
1145 fills[0] = (triggered_price, triggered_qty);
1146 self.target_bid = self.core.bid;
1147 self.target_ask = self.core.ask;
1148 self.target_last = self.core.last;
1149 self.core.set_ask_raw(order_price);
1150 self.core.set_last_raw(order_price);
1151 } else if order.order_side() == OrderSide::Buy
1152 && order_price < triggered_price
1153 {
1154 let first_fill = fills.first().unwrap();
1156 let triggered_qty = first_fill.1;
1157 fills[0] = (triggered_price, triggered_qty);
1158 self.target_bid = self.core.bid;
1159 self.target_ask = self.core.ask;
1160 self.target_last = self.core.last;
1161 self.core.set_bid_raw(order_price);
1162 self.core.set_last_raw(order_price);
1163 }
1164 }
1165 }
1166
1167 if order
1169 .liquidity_side()
1170 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1171 {
1172 match order.order_side().as_specified() {
1173 OrderSideSpecified::Buy => {
1174 let target_price = if order
1175 .trigger_price()
1176 .is_some_and(|trigger_price| order_price > trigger_price)
1177 {
1178 order.trigger_price().unwrap()
1179 } else {
1180 order_price
1181 };
1182 for fill in &fills {
1183 let last_px = fill.0;
1184 if last_px < order_price {
1185 self.target_bid = self.core.bid;
1187 self.target_ask = self.core.ask;
1188 self.target_last = self.core.last;
1189 self.core.set_ask_raw(target_price);
1190 self.core.set_last_raw(target_price);
1191 }
1192 }
1193 }
1194 OrderSideSpecified::Sell => {
1195 let target_price = if order
1196 .trigger_price()
1197 .is_some_and(|trigger_price| order_price < trigger_price)
1198 {
1199 order.trigger_price().unwrap()
1200 } else {
1201 order_price
1202 };
1203 for fill in &fills {
1204 let last_px = fill.0;
1205 if last_px > order_price {
1206 self.target_bid = self.core.bid;
1208 self.target_ask = self.core.ask;
1209 self.target_last = self.core.last;
1210 self.core.set_bid_raw(target_price);
1211 self.core.set_last_raw(target_price);
1212 }
1213 }
1214 }
1215 }
1216 }
1217
1218 fills
1219 }
1220 None => panic!("Limit order must have a price"),
1221 }
1222 }
1223
1224 fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1225 let price = match order.order_side().as_specified() {
1227 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
1228 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
1229 };
1230
1231 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
1233 self.book.simulate_fills(&book_order)
1234 }
1235
1236 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
1237 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id()) {
1238 if filled_qty >= &order.quantity() {
1239 log::info!(
1240 "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
1241 filled_qty,
1242 order.quantity(),
1243 order.filled_qty(),
1244 order.quantity()
1245 );
1246 return;
1247 }
1248 }
1249
1250 let venue_position_id = self.ids_generator.get_position_id(order, Some(true));
1251 let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
1252 let cache = self.cache.as_ref().borrow();
1253 cache.position(&venue_position_id).cloned()
1254 } else {
1255 None
1256 };
1257
1258 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1259 log::warn!(
1260 "Canceling REDUCE_ONLY {} as would increase position",
1261 order.order_type()
1262 );
1263 self.cancel_order(order, None);
1264 return;
1265 }
1266 order.set_liquidity_side(LiquiditySide::Taker);
1268 let fills = self.determine_market_price_and_volume(order);
1269 self.apply_fills(order, fills, LiquiditySide::Taker, None, position);
1270 }
1271
1272 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
1273 match order.price() {
1274 Some(order_price) => {
1275 let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
1276 if cached_filled_qty.is_some() && *cached_filled_qty.unwrap() >= order.quantity() {
1277 log::debug!(
1278 "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
1279 cached_filled_qty.unwrap(),
1280 order.quantity(),
1281 order.filled_qty(),
1282 order.leaves_qty(),
1283 );
1284 return;
1285 }
1286
1287 if order
1288 .liquidity_side()
1289 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1290 {
1291 if order.order_side() == OrderSide::Buy
1292 && self.core.bid.is_some_and(|bid| bid == order_price)
1293 && !self.fill_model.is_limit_filled()
1294 {
1295 return;
1297 }
1298 if order.order_side() == OrderSide::Sell
1299 && self.core.ask.is_some_and(|ask| ask == order_price)
1300 && !self.fill_model.is_limit_filled()
1301 {
1302 return;
1304 }
1305 }
1306
1307 let venue_position_id = self.ids_generator.get_position_id(order, None);
1308 let position = if let Some(venue_position_id) = venue_position_id {
1309 let cache = self.cache.as_ref().borrow();
1310 cache.position(&venue_position_id).cloned()
1311 } else {
1312 None
1313 };
1314
1315 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1316 log::warn!(
1317 "Canceling REDUCE_ONLY {} as would increase position",
1318 order.order_type()
1319 );
1320 self.cancel_order(order, None);
1321 return;
1322 }
1323
1324 let fills = self.determine_limit_price_and_volume(order);
1325
1326 self.apply_fills(
1327 order,
1328 fills,
1329 order.liquidity_side().unwrap(),
1330 venue_position_id,
1331 position,
1332 );
1333 }
1334 None => panic!("Limit order must have a price"),
1335 }
1336 }
1337
1338 fn apply_fills(
1339 &mut self,
1340 order: &mut OrderAny,
1341 fills: Vec<(Price, Quantity)>,
1342 liquidity_side: LiquiditySide,
1343 venue_position_id: Option<PositionId>,
1344 position: Option<Position>,
1345 ) {
1346 if order.time_in_force() == TimeInForce::Fok {
1347 let mut total_size = Quantity::zero(order.quantity().precision);
1348 for (fill_px, fill_qty) in &fills {
1349 total_size = total_size.add(*fill_qty);
1350 }
1351
1352 if order.leaves_qty() > total_size {
1353 self.cancel_order(order, None);
1354 return;
1355 }
1356 }
1357
1358 if fills.is_empty() {
1359 if order.status() == OrderStatus::Submitted {
1360 self.generate_order_rejected(
1361 order,
1362 format!("No market for {}", order.instrument_id()).into(),
1363 );
1364 } else {
1365 log::error!(
1366 "Cannot fill order: no fills from book when fills were expected (check size in data)"
1367 );
1368 return;
1369 }
1370 }
1371
1372 if self.oms_type == OmsType::Netting {
1373 let venue_position_id: Option<PositionId> = None;
1374 }
1375
1376 let mut initial_market_to_limit_fill = false;
1377 for &(mut fill_px, ref fill_qty) in &fills {
1378 assert!(
1380 (fill_px.precision == self.instrument.price_precision()),
1381 "Invalid price precision for fill price {} when instrument price precision is {}.\
1382 Check that the data price precision matches the {} instrument",
1383 fill_px.precision,
1384 self.instrument.price_precision(),
1385 self.instrument.id()
1386 );
1387
1388 assert!(
1390 (fill_qty.precision == self.instrument.size_precision()),
1391 "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
1392 Check that the data quantity precision matches the {} instrument",
1393 fill_qty.precision,
1394 self.instrument.size_precision(),
1395 self.instrument.id()
1396 );
1397
1398 if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
1399 && order.order_type() == OrderType::MarketToLimit
1400 {
1401 self.generate_order_updated(order, order.quantity(), Some(fill_px), None);
1402 initial_market_to_limit_fill = true;
1403 }
1404
1405 if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
1406 fill_px = match order.order_side().as_specified() {
1407 OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
1408 OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
1409 }
1410 }
1411
1412 if self.config.use_reduce_only && order.is_reduce_only() {
1414 if let Some(position) = &position {
1415 if *fill_qty > position.quantity {
1416 if position.quantity == Quantity::zero(position.quantity.precision) {
1417 return;
1419 }
1420
1421 let adjusted_fill_qty =
1423 Quantity::from_raw(position.quantity.raw, fill_qty.precision);
1424
1425 self.generate_order_updated(order, adjusted_fill_qty, None, None);
1426 }
1427 }
1428 }
1429
1430 if fill_qty.is_zero() {
1431 if fills.len() == 1 && order.status() == OrderStatus::Submitted {
1432 self.generate_order_rejected(
1433 order,
1434 format!("No market for {}", order.instrument_id()).into(),
1435 );
1436 }
1437 return;
1438 }
1439
1440 self.fill_order(
1441 order,
1442 fill_px,
1443 *fill_qty,
1444 liquidity_side,
1445 venue_position_id,
1446 position.clone(),
1447 );
1448
1449 if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
1450 return;
1452 }
1453 }
1454
1455 if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
1456 self.cancel_order(order, None);
1458 return;
1459 }
1460
1461 if order.is_open()
1462 && self.book_type == BookType::L1_MBP
1463 && matches!(
1464 order.order_type(),
1465 OrderType::Market | OrderType::MarketIfTouched | OrderType::StopMarket
1466 )
1467 {
1468 todo!("Exhausted simulated book volume")
1472 }
1473 }
1474
1475 fn fill_order(
1476 &mut self,
1477 order: &mut OrderAny,
1478 last_px: Price,
1479 last_qty: Quantity,
1480 liquidity_side: LiquiditySide,
1481 venue_position_id: Option<PositionId>,
1482 position: Option<Position>,
1483 ) {
1484 match self.cached_filled_qty.get(&order.client_order_id()) {
1485 Some(filled_qty) => {
1486 let leaves_qty = order.quantity() - *filled_qty;
1487 let last_qty = min(last_qty, leaves_qty);
1488 let new_filled_qty = *filled_qty + last_qty;
1489 self.cached_filled_qty
1491 .insert(order.client_order_id(), new_filled_qty);
1492 }
1493 None => {
1494 self.cached_filled_qty
1495 .insert(order.client_order_id(), last_qty);
1496 }
1497 }
1498
1499 let commission = self
1501 .fee_model
1502 .get_commission(order, last_qty, last_px, &self.instrument)
1503 .unwrap();
1504
1505 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1506 self.generate_order_filled(
1507 order,
1508 venue_order_id,
1509 venue_position_id,
1510 last_qty,
1511 last_px,
1512 self.instrument.quote_currency(),
1513 commission,
1514 liquidity_side,
1515 );
1516
1517 if order.is_passive() && order.is_closed() {
1518 if self.core.order_exists(order.client_order_id()) {
1520 let _ = self
1521 .core
1522 .delete_order(&PassiveOrderAny::from(order.clone()));
1523 }
1524 self.cached_filled_qty.remove(&order.client_order_id());
1525 }
1526
1527 if !self.config.support_contingent_orders {
1528 return;
1529 }
1530
1531 if let Some(contingency_type) = order.contingency_type() {
1532 match contingency_type {
1533 ContingencyType::Oto => {
1534 if let Some(linked_orders_ids) = order.linked_order_ids() {
1535 for client_order_id in linked_orders_ids {
1536 let mut child_order = match self.cache.borrow().order(client_order_id) {
1537 Some(child_order) => child_order.clone(),
1538 None => panic!("Order {client_order_id} not found in cache"),
1539 };
1540
1541 if child_order.is_closed() || child_order.is_active_local() {
1542 continue;
1543 }
1544
1545 if let (None, Some(position_id)) =
1547 (child_order.position_id(), order.position_id())
1548 {
1549 self.cache
1550 .borrow_mut()
1551 .add_position_id(
1552 &position_id,
1553 &self.venue,
1554 client_order_id,
1555 &child_order.strategy_id(),
1556 )
1557 .unwrap();
1558 log::debug!(
1559 "Added position id {position_id} to cache for order {client_order_id}"
1560 );
1561 }
1562
1563 if (!child_order.is_open())
1564 || (matches!(child_order.status(), OrderStatus::PendingUpdate)
1565 && child_order
1566 .previous_status()
1567 .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
1568 {
1569 let account_id = order.account_id().unwrap_or_else(|| {
1570 *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
1571 panic!(
1572 "Account ID not found for trader {}",
1573 order.trader_id()
1574 )
1575 })
1576 });
1577 self.process_order(&mut child_order, account_id);
1578 }
1579 }
1580 } else {
1581 log::error!(
1582 "OTO order {} does not have linked orders",
1583 order.client_order_id()
1584 );
1585 }
1586 }
1587 ContingencyType::Oco => {
1588 if let Some(linked_orders_ids) = order.linked_order_ids() {
1589 for client_order_id in linked_orders_ids {
1590 let child_order = match self.cache.borrow().order(client_order_id) {
1591 Some(child_order) => child_order.clone(),
1592 None => panic!("Order {client_order_id} not found in cache"),
1593 };
1594
1595 if child_order.is_closed() || child_order.is_active_local() {
1596 continue;
1597 }
1598
1599 self.cancel_order(&child_order, None);
1600 }
1601 } else {
1602 log::error!(
1603 "OCO order {} does not have linked orders",
1604 order.client_order_id()
1605 );
1606 }
1607 }
1608 ContingencyType::Ouo => {
1609 if let Some(linked_orders_ids) = order.linked_order_ids() {
1610 for client_order_id in linked_orders_ids {
1611 let mut child_order = match self.cache.borrow().order(client_order_id) {
1612 Some(child_order) => child_order.clone(),
1613 None => panic!("Order {client_order_id} not found in cache"),
1614 };
1615
1616 if child_order.is_active_local() {
1617 continue;
1618 }
1619
1620 if order.is_closed() && child_order.is_open() {
1621 self.cancel_order(&child_order, None);
1622 } else if !order.leaves_qty().is_zero()
1623 && order.leaves_qty() != child_order.leaves_qty()
1624 {
1625 let price = child_order.price();
1626 let trigger_price = child_order.trigger_price();
1627 self.update_order(
1628 &mut child_order,
1629 Some(order.leaves_qty()),
1630 price,
1631 trigger_price,
1632 Some(false),
1633 );
1634 }
1635 }
1636 } else {
1637 log::error!(
1638 "OUO order {} does not have linked orders",
1639 order.client_order_id()
1640 );
1641 }
1642 }
1643 _ => {}
1644 }
1645 }
1646 }
1647
1648 fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
1649 if self
1650 .core
1651 .is_limit_matched(order.order_side_specified(), price)
1652 {
1653 if order.is_post_only() {
1654 self.generate_order_modify_rejected(
1655 order.trader_id(),
1656 order.strategy_id(),
1657 order.instrument_id(),
1658 order.client_order_id(),
1659 Ustr::from(format!(
1660 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1661 order.order_type(),
1662 order.order_side(),
1663 price,
1664 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1665 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1666 ).as_str()),
1667 order.venue_order_id(),
1668 order.account_id(),
1669 );
1670 return;
1671 }
1672
1673 self.generate_order_updated(order, quantity, Some(price), None);
1674 order.set_liquidity_side(LiquiditySide::Taker);
1675 self.fill_limit_order(order);
1676 return;
1677 }
1678 self.generate_order_updated(order, quantity, Some(price), None);
1679 }
1680
1681 fn update_stop_market_order(
1682 &mut self,
1683 order: &mut OrderAny,
1684 quantity: Quantity,
1685 trigger_price: Price,
1686 ) {
1687 if self
1688 .core
1689 .is_stop_matched(order.order_side_specified(), trigger_price)
1690 {
1691 self.generate_order_modify_rejected(
1692 order.trader_id(),
1693 order.strategy_id(),
1694 order.instrument_id(),
1695 order.client_order_id(),
1696 Ustr::from(
1697 format!(
1698 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1699 order.order_type(),
1700 order.order_side(),
1701 trigger_price,
1702 self.core
1703 .bid
1704 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1705 self.core
1706 .ask
1707 .map_or_else(|| "None".to_string(), |p| p.to_string())
1708 )
1709 .as_str(),
1710 ),
1711 order.venue_order_id(),
1712 order.account_id(),
1713 );
1714 return;
1715 }
1716
1717 self.generate_order_updated(order, quantity, None, Some(trigger_price));
1718 }
1719
1720 fn update_stop_limit_order(
1721 &mut self,
1722 order: &mut OrderAny,
1723 quantity: Quantity,
1724 price: Price,
1725 trigger_price: Price,
1726 ) {
1727 if order.is_triggered().is_some_and(|t| t) {
1728 if self
1730 .core
1731 .is_limit_matched(order.order_side_specified(), price)
1732 {
1733 if order.is_post_only() {
1734 self.generate_order_modify_rejected(
1735 order.trader_id(),
1736 order.strategy_id(),
1737 order.instrument_id(),
1738 order.client_order_id(),
1739 Ustr::from(format!(
1740 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1741 order.order_type(),
1742 order.order_side(),
1743 price,
1744 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1745 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1746 ).as_str()),
1747 order.venue_order_id(),
1748 order.account_id(),
1749 );
1750 return;
1751 }
1752 self.generate_order_updated(order, quantity, Some(price), None);
1753 order.set_liquidity_side(LiquiditySide::Taker);
1754 self.fill_limit_order(order);
1755 return; }
1757 } else {
1758 if self
1760 .core
1761 .is_stop_matched(order.order_side_specified(), trigger_price)
1762 {
1763 self.generate_order_modify_rejected(
1764 order.trader_id(),
1765 order.strategy_id(),
1766 order.instrument_id(),
1767 order.client_order_id(),
1768 Ustr::from(
1769 format!(
1770 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1771 order.order_type(),
1772 order.order_side(),
1773 trigger_price,
1774 self.core
1775 .bid
1776 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1777 self.core
1778 .ask
1779 .map_or_else(|| "None".to_string(), |p| p.to_string())
1780 )
1781 .as_str(),
1782 ),
1783 order.venue_order_id(),
1784 order.account_id(),
1785 );
1786 return;
1787 }
1788 }
1789
1790 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
1791 }
1792
1793 fn update_market_if_touched_order(
1794 &mut self,
1795 order: &mut OrderAny,
1796 quantity: Quantity,
1797 trigger_price: Price,
1798 ) {
1799 if self
1800 .core
1801 .is_touch_triggered(order.order_side_specified(), trigger_price)
1802 {
1803 self.generate_order_modify_rejected(
1804 order.trader_id(),
1805 order.strategy_id(),
1806 order.instrument_id(),
1807 order.client_order_id(),
1808 Ustr::from(
1809 format!(
1810 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
1811 order.order_type(),
1812 order.order_side(),
1813 trigger_price,
1814 self.core
1815 .bid
1816 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1817 self.core
1818 .ask
1819 .map_or_else(|| "None".to_string(), |p| p.to_string())
1820 )
1821 .as_str(),
1822 ),
1823 order.venue_order_id(),
1824 order.account_id(),
1825 );
1826 return;
1828 }
1829
1830 self.generate_order_updated(order, quantity, None, Some(trigger_price));
1831 }
1832
1833 fn update_limit_if_touched_order(
1834 &mut self,
1835 order: &mut OrderAny,
1836 quantity: Quantity,
1837 price: Price,
1838 trigger_price: Price,
1839 ) {
1840 if order.is_triggered().is_some_and(|t| t) {
1841 if self
1843 .core
1844 .is_limit_matched(order.order_side_specified(), price)
1845 {
1846 if order.is_post_only() {
1847 self.generate_order_modify_rejected(
1848 order.trader_id(),
1849 order.strategy_id(),
1850 order.instrument_id(),
1851 order.client_order_id(),
1852 Ustr::from(format!(
1853 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1854 order.order_type(),
1855 order.order_side(),
1856 price,
1857 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1858 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1859 ).as_str()),
1860 order.venue_order_id(),
1861 order.account_id(),
1862 );
1863 return;
1865 }
1866 self.generate_order_updated(order, quantity, Some(price), None);
1867 order.set_liquidity_side(LiquiditySide::Taker);
1868 self.fill_limit_order(order);
1869 return;
1870 }
1871 } else {
1872 if self
1874 .core
1875 .is_touch_triggered(order.order_side_specified(), trigger_price)
1876 {
1877 self.generate_order_modify_rejected(
1878 order.trader_id(),
1879 order.strategy_id(),
1880 order.instrument_id(),
1881 order.client_order_id(),
1882 Ustr::from(
1883 format!(
1884 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
1885 order.order_type(),
1886 order.order_side(),
1887 trigger_price,
1888 self.core
1889 .bid
1890 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1891 self.core
1892 .ask
1893 .map_or_else(|| "None".to_string(), |p| p.to_string())
1894 )
1895 .as_str(),
1896 ),
1897 order.venue_order_id(),
1898 order.account_id(),
1899 );
1900 return;
1901 }
1902 }
1903
1904 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
1905 }
1906
1907 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1908 let (new_trigger_price, new_price) = trailing_stop_calculate(
1909 self.instrument.price_increment(),
1910 order,
1911 self.core.bid,
1912 self.core.ask,
1913 self.core.last,
1914 )
1915 .unwrap();
1916
1917 if new_trigger_price.is_none() && new_price.is_none() {
1918 return;
1919 }
1920
1921 self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price);
1922 }
1923
1924 fn accept_order(&mut self, order: &mut OrderAny) {
1927 if order.is_closed() {
1928 return;
1930 }
1931 if order.status() != OrderStatus::Accepted {
1932 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1933 self.generate_order_accepted(order, venue_order_id);
1934
1935 if matches!(
1936 order.order_type(),
1937 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
1938 ) && order.trigger_price().is_none()
1939 {
1940 self.update_trailing_stop_order(order);
1941 }
1942 }
1943
1944 let _ = self.core.add_order(order.to_owned().into());
1945 }
1946
1947 fn expire_order(&mut self, order: &PassiveOrderAny) {
1948 if self.config.support_contingent_orders
1949 && order
1950 .contingency_type()
1951 .is_some_and(|c| c != ContingencyType::NoContingency)
1952 {
1953 self.cancel_contingent_orders(&OrderAny::from(order.clone()));
1954 }
1955
1956 self.generate_order_expired(&order.to_any());
1957 }
1958
1959 fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
1960 let cancel_contingencies = cancel_contingencies.unwrap_or(true);
1961 if order.is_active_local() {
1962 log::error!(
1963 "Cannot cancel an order with {} from the matching engine",
1964 order.status()
1965 );
1966 return;
1967 }
1968
1969 if self.core.order_exists(order.client_order_id()) {
1971 let _ = self
1972 .core
1973 .delete_order(&PassiveOrderAny::from(order.clone()));
1974 }
1975 self.cached_filled_qty.remove(&order.client_order_id());
1976
1977 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1978 self.generate_order_canceled(order, venue_order_id);
1979
1980 if self.config.support_contingent_orders
1981 && order.contingency_type().is_some()
1982 && order.contingency_type().unwrap() != ContingencyType::NoContingency
1983 && cancel_contingencies
1984 {
1985 self.cancel_contingent_orders(order);
1986 }
1987 }
1988
1989 fn update_order(
1990 &mut self,
1991 order: &mut OrderAny,
1992 quantity: Option<Quantity>,
1993 price: Option<Price>,
1994 trigger_price: Option<Price>,
1995 update_contingencies: Option<bool>,
1996 ) {
1997 let update_contingencies = update_contingencies.unwrap_or(true);
1998 let quantity = quantity.unwrap_or(order.quantity());
1999
2000 match order {
2001 OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
2002 let price = price.unwrap_or(order.price().unwrap());
2003 self.update_limit_order(order, quantity, price);
2004 }
2005 OrderAny::StopMarket(_) => {
2006 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2007 self.update_stop_market_order(order, quantity, trigger_price);
2008 }
2009 OrderAny::StopLimit(_) => {
2010 let price = price.unwrap_or(order.price().unwrap());
2011 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2012 self.update_stop_limit_order(order, quantity, price, trigger_price);
2013 }
2014 OrderAny::MarketIfTouched(_) => {
2015 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2016 self.update_market_if_touched_order(order, quantity, trigger_price);
2017 }
2018 OrderAny::LimitIfTouched(_) => {
2019 let price = price.unwrap_or(order.price().unwrap());
2020 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2021 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2022 }
2023 OrderAny::TrailingStopMarket(_) => {
2024 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2025 self.update_market_if_touched_order(order, quantity, trigger_price);
2026 }
2027 OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
2028 let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
2029 let trigger_price =
2030 trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
2031 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2032 }
2033 _ => {
2034 panic!(
2035 "Unsupported order type {} for update_order",
2036 order.order_type()
2037 );
2038 }
2039 }
2040
2041 if self.config.support_contingent_orders
2042 && order
2043 .contingency_type()
2044 .is_some_and(|c| c != ContingencyType::NoContingency)
2045 && update_contingencies
2046 {
2047 self.update_contingent_order(order);
2048 }
2049 }
2050
2051 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
2052 todo!("trigger_stop_order")
2053 }
2054
2055 fn update_contingent_order(&mut self, order: &OrderAny) {
2056 log::debug!("Updating OUO orders from {}", order.client_order_id());
2057 if let Some(linked_order_ids) = order.linked_order_ids() {
2058 for client_order_id in linked_order_ids {
2059 let mut child_order = match self.cache.borrow().order(client_order_id) {
2060 Some(order) => order.clone(),
2061 None => panic!("Order {client_order_id} not found in cache."),
2062 };
2063
2064 if child_order.is_active_local() {
2065 continue;
2066 }
2067
2068 if order.leaves_qty().is_zero() {
2069 self.cancel_order(&child_order, None);
2070 } else if child_order.leaves_qty() != order.leaves_qty() {
2071 let price = child_order.price();
2072 let trigger_price = child_order.trigger_price();
2073 self.update_order(
2074 &mut child_order,
2075 Some(order.leaves_qty()),
2076 price,
2077 trigger_price,
2078 Some(false),
2079 );
2080 }
2081 }
2082 }
2083 }
2084
2085 fn cancel_contingent_orders(&mut self, order: &OrderAny) {
2086 if let Some(linked_order_ids) = order.linked_order_ids() {
2087 for client_order_id in linked_order_ids {
2088 let contingent_order = match self.cache.borrow().order(client_order_id) {
2089 Some(order) => order.clone(),
2090 None => panic!("Cannot find contingent order for {client_order_id}"),
2091 };
2092 if contingent_order.is_active_local() {
2093 continue;
2095 }
2096 if !contingent_order.is_closed() {
2097 self.cancel_order(&contingent_order, Some(false));
2098 }
2099 }
2100 }
2101 }
2102
2103 fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
2106 let ts_now = self.clock.borrow().timestamp_ns();
2107 let account_id = order
2108 .account_id()
2109 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2110
2111 let event = OrderEventAny::Rejected(OrderRejected::new(
2112 order.trader_id(),
2113 order.strategy_id(),
2114 order.instrument_id(),
2115 order.client_order_id(),
2116 account_id,
2117 reason,
2118 UUID4::new(),
2119 ts_now,
2120 ts_now,
2121 false,
2122 ));
2123 msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2124 }
2125
2126 fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
2127 let ts_now = self.clock.borrow().timestamp_ns();
2128 let account_id = order
2129 .account_id()
2130 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2131 let event = OrderEventAny::Accepted(OrderAccepted::new(
2132 order.trader_id(),
2133 order.strategy_id(),
2134 order.instrument_id(),
2135 order.client_order_id(),
2136 venue_order_id,
2137 account_id,
2138 UUID4::new(),
2139 ts_now,
2140 ts_now,
2141 false,
2142 ));
2143 msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2144
2145 order.apply(event).expect("Failed to apply order event");
2147 }
2148
2149 #[allow(clippy::too_many_arguments)]
2150 fn generate_order_modify_rejected(
2151 &self,
2152 trader_id: TraderId,
2153 strategy_id: StrategyId,
2154 instrument_id: InstrumentId,
2155 client_order_id: ClientOrderId,
2156 reason: Ustr,
2157 venue_order_id: Option<VenueOrderId>,
2158 account_id: Option<AccountId>,
2159 ) {
2160 let ts_now = self.clock.borrow().timestamp_ns();
2161 let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
2162 trader_id,
2163 strategy_id,
2164 instrument_id,
2165 client_order_id,
2166 reason,
2167 UUID4::new(),
2168 ts_now,
2169 ts_now,
2170 false,
2171 venue_order_id,
2172 account_id,
2173 ));
2174 msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2175 }
2176
2177 #[allow(clippy::too_many_arguments)]
2178 fn generate_order_cancel_rejected(
2179 &self,
2180 trader_id: TraderId,
2181 strategy_id: StrategyId,
2182 account_id: AccountId,
2183 instrument_id: InstrumentId,
2184 client_order_id: ClientOrderId,
2185 venue_order_id: VenueOrderId,
2186 reason: Ustr,
2187 ) {
2188 let ts_now = self.clock.borrow().timestamp_ns();
2189 let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
2190 trader_id,
2191 strategy_id,
2192 instrument_id,
2193 client_order_id,
2194 reason,
2195 UUID4::new(),
2196 ts_now,
2197 ts_now,
2198 false,
2199 Some(venue_order_id),
2200 Some(account_id),
2201 ));
2202 msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2203 }
2204
2205 fn generate_order_updated(
2206 &self,
2207 order: &mut OrderAny,
2208 quantity: Quantity,
2209 price: Option<Price>,
2210 trigger_price: Option<Price>,
2211 ) {
2212 let ts_now = self.clock.borrow().timestamp_ns();
2213 let event = OrderEventAny::Updated(OrderUpdated::new(
2214 order.trader_id(),
2215 order.strategy_id(),
2216 order.instrument_id(),
2217 order.client_order_id(),
2218 quantity,
2219 UUID4::new(),
2220 ts_now,
2221 ts_now,
2222 false,
2223 order.venue_order_id(),
2224 order.account_id(),
2225 price,
2226 trigger_price,
2227 ));
2228 msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2229
2230 order.apply(event).expect("Failed to apply order event");
2232 }
2233
2234 fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
2235 let ts_now = self.clock.borrow().timestamp_ns();
2236 let event = OrderEventAny::Canceled(OrderCanceled::new(
2237 order.trader_id(),
2238 order.strategy_id(),
2239 order.instrument_id(),
2240 order.client_order_id(),
2241 UUID4::new(),
2242 ts_now,
2243 ts_now,
2244 false,
2245 Some(venue_order_id),
2246 order.account_id(),
2247 ));
2248 msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2249 }
2250
2251 fn generate_order_triggered(&self, order: &OrderAny) {
2252 let ts_now = self.clock.borrow().timestamp_ns();
2253 let event = OrderEventAny::Triggered(OrderTriggered::new(
2254 order.trader_id(),
2255 order.strategy_id(),
2256 order.instrument_id(),
2257 order.client_order_id(),
2258 UUID4::new(),
2259 ts_now,
2260 ts_now,
2261 false,
2262 order.venue_order_id(),
2263 order.account_id(),
2264 ));
2265 msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2266 }
2267
2268 fn generate_order_expired(&self, order: &OrderAny) {
2269 let ts_now = self.clock.borrow().timestamp_ns();
2270 let event = OrderEventAny::Expired(OrderExpired::new(
2271 order.trader_id(),
2272 order.strategy_id(),
2273 order.instrument_id(),
2274 order.client_order_id(),
2275 UUID4::new(),
2276 ts_now,
2277 ts_now,
2278 false,
2279 order.venue_order_id(),
2280 order.account_id(),
2281 ));
2282 msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2283 }
2284
2285 #[allow(clippy::too_many_arguments)]
2286 fn generate_order_filled(
2287 &mut self,
2288 order: &mut OrderAny,
2289 venue_order_id: VenueOrderId,
2290 venue_position_id: Option<PositionId>,
2291 last_qty: Quantity,
2292 last_px: Price,
2293 quote_currency: Currency,
2294 commission: Money,
2295 liquidity_side: LiquiditySide,
2296 ) {
2297 let ts_now = self.clock.borrow().timestamp_ns();
2298 let account_id = order
2299 .account_id()
2300 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2301 let event = OrderEventAny::Filled(OrderFilled::new(
2302 order.trader_id(),
2303 order.strategy_id(),
2304 order.instrument_id(),
2305 order.client_order_id(),
2306 venue_order_id,
2307 account_id,
2308 self.ids_generator.generate_trade_id(),
2309 order.order_side(),
2310 order.order_type(),
2311 last_qty,
2312 last_px,
2313 quote_currency,
2314 liquidity_side,
2315 UUID4::new(),
2316 ts_now,
2317 ts_now,
2318 false,
2319 venue_position_id,
2320 Some(commission),
2321 ));
2322 msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2323
2324 order.apply(event).expect("Failed to apply order event");
2326 }
2327}