1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::RefCell,
23 cmp::min,
24 collections::HashMap,
25 fmt::Debug,
26 ops::{Add, Sub},
27 rc::Rc,
28};
29
30use chrono::TimeDelta;
31use nautilus_common::{
32 cache::Cache,
33 clock::Clock,
34 messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
35 msgbus,
36};
37use nautilus_core::{UUID4, UnixNanos};
38use nautilus_model::{
39 data::{Bar, BarType, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, order::BookOrder},
40 enums::{
41 AccountType, AggregationSource, AggressorSide, BookType, ContingencyType, LiquiditySide,
42 MarketStatus, MarketStatusAction, OmsType, OrderSide, OrderSideSpecified, OrderStatus,
43 OrderType, PriceType, TimeInForce,
44 },
45 events::{
46 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
47 OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
48 },
49 identifiers::{
50 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
51 VenueOrderId,
52 },
53 instruments::{EXPIRING_INSTRUMENT_CLASSES, Instrument, InstrumentAny},
54 orderbook::OrderBook,
55 orders::{Order, OrderAny, PassiveOrderAny, StopOrderAny},
56 position::Position,
57 types::{Currency, Money, Price, Quantity, fixed::FIXED_PRECISION},
58};
59use ustr::Ustr;
60
61use crate::{
62 matching_core::OrderMatchingCore,
63 matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
64 models::{
65 fee::{FeeModel, FeeModelAny},
66 fill::FillModel,
67 },
68 protection::protection_price_calculate,
69 trailing::trailing_stop_calculate,
70};
71
72pub struct OrderMatchingEngine {
74 pub venue: Venue,
76 pub instrument: InstrumentAny,
78 pub raw_id: u32,
80 pub book_type: BookType,
82 pub oms_type: OmsType,
84 pub account_type: AccountType,
86 pub market_status: MarketStatus,
88 pub config: OrderMatchingEngineConfig,
90 clock: Rc<RefCell<dyn Clock>>,
91 cache: Rc<RefCell<Cache>>,
92 book: OrderBook,
93 pub core: OrderMatchingCore,
94 fill_model: FillModel,
95 fee_model: FeeModelAny,
96 target_bid: Option<Price>,
97 target_ask: Option<Price>,
98 target_last: Option<Price>,
99 last_bar_bid: Option<Bar>,
100 last_bar_ask: Option<Bar>,
101 execution_bar_types: HashMap<InstrumentId, BarType>,
102 execution_bar_deltas: HashMap<BarType, TimeDelta>,
103 account_ids: HashMap<TraderId, AccountId>,
104 cached_filled_qty: HashMap<ClientOrderId, Quantity>,
105 ids_generator: IdsGenerator,
106}
107
108impl Debug for OrderMatchingEngine {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 f.debug_struct(stringify!(OrderMatchingEngine))
111 .field("venue", &self.venue)
112 .field("instrument", &self.instrument.id())
113 .finish()
114 }
115}
116
117impl OrderMatchingEngine {
118 #[allow(clippy::too_many_arguments)]
120 pub fn new(
121 instrument: InstrumentAny,
122 raw_id: u32,
123 fill_model: FillModel,
124 fee_model: FeeModelAny,
125 book_type: BookType,
126 oms_type: OmsType,
127 account_type: AccountType,
128 clock: Rc<RefCell<dyn Clock>>,
129 cache: Rc<RefCell<Cache>>,
130 config: OrderMatchingEngineConfig,
131 ) -> Self {
132 let book = OrderBook::new(instrument.id(), book_type);
133 let core = OrderMatchingCore::new(
134 instrument.id(),
135 instrument.price_increment(),
136 None, None, None, );
140 let ids_generator = IdsGenerator::new(
141 instrument.id().venue,
142 oms_type,
143 raw_id,
144 config.use_random_ids,
145 config.use_position_ids,
146 cache.clone(),
147 );
148
149 Self {
150 venue: instrument.id().venue,
151 instrument,
152 raw_id,
153 fill_model,
154 fee_model,
155 book_type,
156 oms_type,
157 account_type,
158 clock,
159 cache,
160 book,
161 core,
162 market_status: MarketStatus::Open,
163 config,
164 target_bid: None,
165 target_ask: None,
166 target_last: None,
167 last_bar_bid: None,
168 last_bar_ask: None,
169 execution_bar_types: HashMap::new(),
170 execution_bar_deltas: HashMap::new(),
171 account_ids: HashMap::new(),
172 cached_filled_qty: HashMap::new(),
173 ids_generator,
174 }
175 }
176
177 pub fn reset(&mut self) {
183 self.book.clear(0, UnixNanos::default());
184 self.execution_bar_types.clear();
185 self.execution_bar_deltas.clear();
186 self.account_ids.clear();
187 self.cached_filled_qty.clear();
188 self.core.reset();
189 self.target_bid = None;
190 self.target_ask = None;
191 self.target_last = None;
192 self.ids_generator.reset();
193
194 log::info!("Reset {}", self.instrument.id());
195 }
196
197 pub const fn set_fill_model(&mut self, fill_model: FillModel) {
199 self.fill_model = fill_model;
200 }
201
202 #[must_use]
203 pub fn best_bid_price(&self) -> Option<Price> {
205 self.book.best_bid_price()
206 }
207
208 #[must_use]
209 pub fn best_ask_price(&self) -> Option<Price> {
211 self.book.best_ask_price()
212 }
213
214 #[must_use]
215 pub const fn get_book(&self) -> &OrderBook {
217 &self.book
218 }
219
220 #[must_use]
221 pub const fn get_open_bid_orders(&self) -> &[PassiveOrderAny] {
223 self.core.get_orders_bid()
224 }
225
226 #[must_use]
227 pub const fn get_open_ask_orders(&self) -> &[PassiveOrderAny] {
229 self.core.get_orders_ask()
230 }
231
232 #[must_use]
233 pub fn get_open_orders(&self) -> Vec<PassiveOrderAny> {
235 let mut orders = Vec::new();
237 orders.extend_from_slice(self.core.get_orders_bid());
238 orders.extend_from_slice(self.core.get_orders_ask());
239 orders
240 }
241
242 #[must_use]
243 pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
245 self.core.order_exists(client_order_id)
246 }
247
248 pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) -> anyhow::Result<()> {
256 log::debug!("Processing {delta}");
257
258 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
259 self.book.apply_delta(delta)?;
260 }
261
262 self.iterate(delta.ts_init);
263 Ok(())
264 }
265
266 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
272 log::debug!("Processing {deltas}");
273
274 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
275 self.book.apply_deltas(deltas)?;
276 }
277
278 self.iterate(deltas.ts_init);
279 Ok(())
280 }
281
282 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
288 log::debug!("Processing {quote}");
289
290 let price_prec = self.instrument.price_precision();
291 let size_prec = self.instrument.size_precision();
292 let instrument_id = self.instrument.id();
293
294 assert!(
295 quote.bid_price.precision == price_prec,
296 "Invalid bid_price precision {}, expected {price_prec} for {instrument_id}",
297 quote.bid_price.precision
298 );
299 assert!(
300 quote.ask_price.precision == price_prec,
301 "Invalid ask_price precision {}, expected {price_prec} for {instrument_id}",
302 quote.ask_price.precision
303 );
304 assert!(
305 quote.bid_size.precision == size_prec,
306 "Invalid bid_size precision {}, expected {size_prec} for {instrument_id}",
307 quote.bid_size.precision
308 );
309 assert!(
310 quote.ask_size.precision == size_prec,
311 "Invalid ask_size precision {}, expected {size_prec} for {instrument_id}",
312 quote.ask_size.precision
313 );
314
315 if self.book_type == BookType::L1_MBP {
316 self.book.update_quote_tick(quote).unwrap();
317 }
318
319 self.iterate(quote.ts_init);
320 }
321
322 pub fn process_bar(&mut self, bar: &Bar) {
328 log::debug!("Processing {bar}");
329
330 if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
332 return;
333 }
334
335 let bar_type = bar.bar_type;
336 if bar_type.aggregation_source() == AggregationSource::Internal {
338 return;
339 }
340
341 let price_prec = self.instrument.price_precision();
342 let size_prec = self.instrument.size_precision();
343 let instrument_id = self.instrument.id();
344
345 assert!(
346 bar.open.precision == price_prec,
347 "Invalid bar open precision {}, expected {price_prec} for {instrument_id}",
348 bar.open.precision
349 );
350 assert!(
351 bar.high.precision == price_prec,
352 "Invalid bar high precision {}, expected {price_prec} for {instrument_id}",
353 bar.high.precision
354 );
355 assert!(
356 bar.low.precision == price_prec,
357 "Invalid bar low precision {}, expected {price_prec} for {instrument_id}",
358 bar.low.precision
359 );
360 assert!(
361 bar.close.precision == price_prec,
362 "Invalid bar close precision {}, expected {price_prec} for {instrument_id}",
363 bar.close.precision
364 );
365 assert!(
366 bar.volume.precision == size_prec,
367 "Invalid bar volume precision {}, expected {size_prec} for {instrument_id}",
368 bar.volume.precision
369 );
370
371 let execution_bar_type =
372 if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
373 execution_bar_type.to_owned()
374 } else {
375 self.execution_bar_types
376 .insert(bar.instrument_id(), bar_type);
377 self.execution_bar_deltas
378 .insert(bar_type, bar_type.spec().timedelta());
379 bar_type
380 };
381
382 if execution_bar_type != bar_type {
383 let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
384 if bar_type_timedelta.is_none() {
385 bar_type_timedelta = Some(bar_type.spec().timedelta());
386 self.execution_bar_deltas
387 .insert(bar_type, bar_type_timedelta.unwrap());
388 }
389 if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
390 >= &bar_type_timedelta.unwrap()
391 {
392 self.execution_bar_types
393 .insert(bar_type.instrument_id(), bar_type);
394 } else {
395 return;
396 }
397 }
398
399 match bar_type.spec().price_type {
400 PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
401 PriceType::Bid => {
402 self.last_bar_bid = Some(bar.to_owned());
403 self.process_quote_ticks_from_bar(bar);
404 }
405 PriceType::Ask => {
406 self.last_bar_ask = Some(bar.to_owned());
407 self.process_quote_ticks_from_bar(bar);
408 }
409 PriceType::Mark => panic!("Not implemented"),
410 }
411 }
412
413 fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
414 let quarter_raw = bar.volume.raw / 4;
416 let remainder_raw = bar.volume.raw % 4;
417 let size = Quantity::from_raw(quarter_raw, bar.volume.precision);
418 let close_size = Quantity::from_raw(quarter_raw + remainder_raw, bar.volume.precision);
419
420 let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
421 {
422 AggressorSide::Buyer
423 } else {
424 AggressorSide::Seller
425 };
426
427 let mut trade_tick = TradeTick::new(
429 bar.instrument_id(),
430 bar.open,
431 size,
432 aggressor_side,
433 self.ids_generator.generate_trade_id(),
434 bar.ts_init,
435 bar.ts_init,
436 );
437
438 if !self.core.is_last_initialized {
441 self.book.update_trade_tick(&trade_tick).unwrap();
442 self.iterate(trade_tick.ts_init);
443 self.core.set_last_raw(trade_tick.price);
444 }
445
446 if self.core.last.is_some_and(|last| bar.high > last) {
449 trade_tick.price = bar.high;
450 trade_tick.aggressor_side = AggressorSide::Buyer;
451 trade_tick.trade_id = self.ids_generator.generate_trade_id();
452
453 self.book.update_trade_tick(&trade_tick).unwrap();
454 self.iterate(trade_tick.ts_init);
455
456 self.core.set_last_raw(trade_tick.price);
457 }
458
459 if self.core.last.is_some_and(|last| bar.low < last) {
463 trade_tick.price = bar.low;
464 trade_tick.aggressor_side = AggressorSide::Seller;
465 trade_tick.trade_id = self.ids_generator.generate_trade_id();
466
467 self.book.update_trade_tick(&trade_tick).unwrap();
468 self.iterate(trade_tick.ts_init);
469
470 self.core.set_last_raw(trade_tick.price);
471 }
472
473 if self.core.last.is_some_and(|last| bar.close != last) {
478 trade_tick.price = bar.close;
479 trade_tick.size = close_size;
480 trade_tick.aggressor_side = if bar.close > self.core.last.unwrap() {
481 AggressorSide::Buyer
482 } else {
483 AggressorSide::Seller
484 };
485 trade_tick.trade_id = self.ids_generator.generate_trade_id();
486
487 self.book.update_trade_tick(&trade_tick).unwrap();
488 self.iterate(trade_tick.ts_init);
489
490 self.core.set_last_raw(trade_tick.price);
491 }
492 }
493
494 fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
495 if self.last_bar_bid.is_none()
497 || self.last_bar_ask.is_none()
498 || self.last_bar_bid.unwrap().ts_init != self.last_bar_ask.unwrap().ts_init
499 {
500 return;
501 }
502 let bid_bar = self.last_bar_bid.unwrap();
503 let ask_bar = self.last_bar_ask.unwrap();
504
505 let bid_quarter = bid_bar.volume.raw / 4;
507 let bid_remainder = bid_bar.volume.raw % 4;
508 let ask_quarter = ask_bar.volume.raw / 4;
509 let ask_remainder = ask_bar.volume.raw % 4;
510
511 let bid_size = Quantity::from_raw(bid_quarter, bar.volume.precision);
512 let ask_size = Quantity::from_raw(ask_quarter, bar.volume.precision);
513 let bid_close_size = Quantity::from_raw(bid_quarter + bid_remainder, bar.volume.precision);
514 let ask_close_size = Quantity::from_raw(ask_quarter + ask_remainder, bar.volume.precision);
515
516 let mut quote_tick = QuoteTick::new(
518 self.book.instrument_id,
519 bid_bar.open,
520 ask_bar.open,
521 bid_size,
522 ask_size,
523 bid_bar.ts_init,
524 bid_bar.ts_init,
525 );
526
527 self.book.update_quote_tick("e_tick).unwrap();
529 self.iterate(quote_tick.ts_init);
530
531 quote_tick.bid_price = bid_bar.high;
533 quote_tick.ask_price = ask_bar.high;
534 self.book.update_quote_tick("e_tick).unwrap();
535 self.iterate(quote_tick.ts_init);
536
537 quote_tick.bid_price = bid_bar.low;
539 quote_tick.ask_price = ask_bar.low;
540 self.book.update_quote_tick("e_tick).unwrap();
541 self.iterate(quote_tick.ts_init);
542
543 quote_tick.bid_price = bid_bar.close;
545 quote_tick.ask_price = ask_bar.close;
546 quote_tick.bid_size = bid_close_size;
547 quote_tick.ask_size = ask_close_size;
548 self.book.update_quote_tick("e_tick).unwrap();
549 self.iterate(quote_tick.ts_init);
550
551 self.last_bar_bid = None;
553 self.last_bar_ask = None;
554 }
555
556 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
562 log::debug!("Processing {trade}");
563
564 let price_prec = self.instrument.price_precision();
565 let size_prec = self.instrument.size_precision();
566 let instrument_id = self.instrument.id();
567
568 assert!(
569 trade.price.precision == price_prec,
570 "Invalid trade price precision {}, expected {price_prec} for {instrument_id}",
571 trade.price.precision
572 );
573 assert!(
574 trade.size.precision == size_prec,
575 "Invalid trade size precision {}, expected {size_prec} for {instrument_id}",
576 trade.size.precision
577 );
578
579 if self.book_type == BookType::L1_MBP {
580 self.book.update_trade_tick(trade).unwrap();
581 }
582 self.core.set_last_raw(trade.price);
583
584 self.iterate(trade.ts_init);
585 }
586
587 pub fn process_status(&mut self, action: MarketStatusAction) {
588 log::debug!("Processing {action}");
589
590 if self.market_status == MarketStatus::Closed
592 && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
593 {
594 self.market_status = MarketStatus::Open;
595 }
596 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
598 self.market_status = MarketStatus::Paused;
599 }
600 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
602 self.market_status = MarketStatus::Suspended;
603 }
604 if self.market_status == MarketStatus::Open
606 && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
607 {
608 self.market_status = MarketStatus::Closed;
609 }
610 }
611
612 #[allow(clippy::needless_return)]
618 pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
619 {
621 let cache_borrow = self.cache.as_ref().borrow();
622
623 if self.core.order_exists(order.client_order_id()) {
624 self.generate_order_rejected(order, "Order already exists".into());
625 return;
626 }
627
628 self.account_ids.insert(order.trader_id(), account_id);
630
631 if EXPIRING_INSTRUMENT_CLASSES.contains(&self.instrument.instrument_class()) {
633 if let Some(activation_ns) = self.instrument.activation_ns()
634 && self.clock.borrow().timestamp_ns() < activation_ns
635 {
636 self.generate_order_rejected(
637 order,
638 format!(
639 "Contract {} is not yet active, activation {}",
640 self.instrument.id(),
641 self.instrument.activation_ns().unwrap()
642 )
643 .into(),
644 );
645 return;
646 }
647 if let Some(expiration_ns) = self.instrument.expiration_ns()
648 && self.clock.borrow().timestamp_ns() >= expiration_ns
649 {
650 self.generate_order_rejected(
651 order,
652 format!(
653 "Contract {} has expired, expiration {}",
654 self.instrument.id(),
655 self.instrument.expiration_ns().unwrap()
656 )
657 .into(),
658 );
659 return;
660 }
661 }
662
663 if self.config.support_contingent_orders {
665 if let Some(parent_order_id) = order.parent_order_id() {
666 let parent_order = cache_borrow.order(&parent_order_id);
667 if parent_order.is_none()
668 || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
669 {
670 panic!("OTO parent not found");
671 }
672 if let Some(parent_order) = parent_order {
673 let parent_order_status = parent_order.status();
674 let order_is_open = order.is_open();
675 if parent_order.status() == OrderStatus::Rejected && order.is_open() {
676 self.generate_order_rejected(
677 order,
678 format!("Rejected OTO order from {parent_order_id}").into(),
679 );
680 return;
681 } else if parent_order.status() == OrderStatus::Accepted
682 && parent_order.status() == OrderStatus::Triggered
683 {
684 log::info!(
685 "Pending OTO order {} triggers from {parent_order_id}",
686 order.client_order_id(),
687 );
688 return;
689 }
690 }
691 }
692
693 if let Some(linked_order_ids) = order.linked_order_ids() {
694 for client_order_id in linked_order_ids {
695 match cache_borrow.order(client_order_id) {
696 Some(contingent_order)
697 if (order.contingency_type().unwrap() == ContingencyType::Oco
698 || order.contingency_type().unwrap()
699 == ContingencyType::Ouo)
700 && !order.is_closed()
701 && contingent_order.is_closed() =>
702 {
703 self.generate_order_rejected(
704 order,
705 format!("Contingent order {client_order_id} already closed")
706 .into(),
707 );
708 return;
709 }
710 None => panic!("Cannot find contingent order for {client_order_id}"),
711 _ => {}
712 }
713 }
714 }
715 }
716
717 if order.quantity().precision != self.instrument.size_precision() {
719 self.generate_order_rejected(
720 order,
721 format!(
722 "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
723 order.client_order_id(),
724 order.quantity().precision,
725 self.instrument.id(),
726 self.instrument.size_precision()
727 )
728 .into(),
729 );
730 return;
731 }
732
733 if let Some(price) = order.price()
735 && price.precision != self.instrument.price_precision()
736 {
737 self.generate_order_rejected(
738 order,
739 format!(
740 "Invalid order price precision for order {}, was {} when {} price precision is {}",
741 order.client_order_id(),
742 price.precision,
743 self.instrument.id(),
744 self.instrument.price_precision()
745 )
746 .into(),
747 );
748 return;
749 }
750
751 if let Some(trigger_price) = order.trigger_price()
753 && trigger_price.precision != self.instrument.price_precision()
754 {
755 self.generate_order_rejected(
756 order,
757 format!(
758 "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
759 order.client_order_id(),
760 trigger_price.precision,
761 self.instrument.id(),
762 self.instrument.price_precision()
763 )
764 .into(),
765 );
766 return;
767 }
768
769 let position: Option<&Position> = cache_borrow
771 .position_for_order(&order.client_order_id())
772 .or_else(|| {
773 if self.oms_type == OmsType::Netting {
774 let position_id = PositionId::new(
775 format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
776 );
777 cache_borrow.position(&position_id)
778 } else {
779 None
780 }
781 });
782
783 if order.order_side() == OrderSide::Sell
785 && self.account_type != AccountType::Margin
786 && matches!(self.instrument, InstrumentAny::Equity(_))
787 && (position.is_none()
788 || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
789 {
790 let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
791 self.generate_order_rejected(
792 order,
793 format!(
794 "Short selling not permitted on a CASH account with position {position_string} and order {order}",
795 )
796 .into(),
797 );
798 return;
799 }
800
801 if self.config.use_reduce_only
803 && order.is_reduce_only()
804 && !order.is_closed()
805 && position.is_none_or(|pos| {
806 pos.is_closed()
807 || (order.is_buy() && pos.is_long())
808 || (order.is_sell() && pos.is_short())
809 })
810 {
811 self.generate_order_rejected(
812 order,
813 format!(
814 "Reduce-only order {} ({}-{}) would have increased position",
815 order.client_order_id(),
816 order.order_type().to_string().to_uppercase(),
817 order.order_side().to_string().to_uppercase()
818 )
819 .into(),
820 );
821 return;
822 }
823 }
824
825 match order.order_type() {
826 OrderType::Market if self.config.price_protection_points.is_some() => {
827 self.process_market_order_with_protection(order);
828 }
829 OrderType::Market => self.process_market_order(order),
830 OrderType::Limit => self.process_limit_order(order),
831 OrderType::MarketToLimit => self.process_market_to_limit_order(order),
832 OrderType::StopMarket if self.config.price_protection_points.is_some() => {
833 self.process_stop_market_order_with_protection(order);
834 }
835 OrderType::StopMarket => self.process_stop_market_order(order),
836 OrderType::StopLimit => self.process_stop_limit_order(order),
837 OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
838 OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
839 OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
840 OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
841 }
842 }
843
844 pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
845 if let Some(order) = self.core.get_order(command.client_order_id) {
846 self.update_order(
847 &mut order.to_any(),
848 command.quantity,
849 command.price,
850 command.trigger_price,
851 None,
852 );
853 } else {
854 self.generate_order_modify_rejected(
855 command.trader_id,
856 command.strategy_id,
857 command.instrument_id,
858 command.client_order_id,
859 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
860 Some(command.venue_order_id),
861 Some(account_id),
862 );
863 }
864 }
865
866 pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
867 match self.core.get_order(command.client_order_id) {
868 Some(passive_order) => {
869 if passive_order.is_inflight() || passive_order.is_open() {
870 self.cancel_order(&OrderAny::from(passive_order.to_owned()), None);
871 }
872 }
873 None => self.generate_order_cancel_rejected(
874 command.trader_id,
875 command.strategy_id,
876 account_id,
877 command.instrument_id,
878 command.client_order_id,
879 command.venue_order_id,
880 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
881 ),
882 }
883 }
884
885 pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
886 let instrument_id = command.instrument_id;
887 let open_orders = self
888 .cache
889 .borrow()
890 .orders_open(None, Some(&instrument_id), None, None)
891 .into_iter()
892 .cloned()
893 .collect::<Vec<OrderAny>>();
894 for order in open_orders {
895 if command.order_side != OrderSide::NoOrderSide
896 && command.order_side != order.order_side()
897 {
898 continue;
899 }
900 if order.is_inflight() || order.is_open() {
901 self.cancel_order(&order, None);
902 }
903 }
904 }
905
906 pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
907 for order in &command.cancels {
908 self.process_cancel(order, account_id);
909 }
910 }
911
912 fn process_market_order(&mut self, order: &mut OrderAny) {
913 if order.time_in_force() == TimeInForce::AtTheOpen
914 || order.time_in_force() == TimeInForce::AtTheClose
915 {
916 log::error!(
917 "Market auction for the time in force {} is currently not supported",
918 order.time_in_force()
919 );
920 return;
921 }
922
923 let order_side = order.order_side();
925 let is_ask_initialized = self.core.is_ask_initialized;
926 let is_bid_initialized = self.core.is_bid_initialized;
927 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
928 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
929 {
930 self.generate_order_rejected(
931 order,
932 format!("No market for {}", order.instrument_id()).into(),
933 );
934 return;
935 }
936
937 self.fill_market_order(order);
938 }
939
940 fn process_market_order_with_protection(&mut self, order: &mut OrderAny) {
941 if order.time_in_force() == TimeInForce::AtTheOpen
942 || order.time_in_force() == TimeInForce::AtTheClose
943 {
944 log::error!(
945 "Market auction for the time in force {} is currently not supported",
946 order.time_in_force()
947 );
948 return;
949 }
950
951 let order_side = order.order_side();
953 let is_ask_initialized = self.core.is_ask_initialized;
954 let is_bid_initialized = self.core.is_bid_initialized;
955 if (order_side == OrderSide::Buy && !self.core.is_ask_initialized)
956 || (order_side == OrderSide::Sell && !self.core.is_bid_initialized)
957 {
958 self.generate_order_rejected(
959 order,
960 format!("No market for {}", order.instrument_id()).into(),
961 );
962 return;
963 }
964
965 self.update_protection_price(order);
966
967 let protection_price = order
968 .price()
969 .expect("Market order with protection must have a protection price");
970
971 self.accept_order(order);
973
974 if self
976 .core
977 .is_limit_matched(order.order_side_specified(), protection_price)
978 {
979 if order.liquidity_side().is_some()
981 && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
982 {
983 order.set_liquidity_side(LiquiditySide::Taker);
984 }
985 self.fill_limit_order(order);
986 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
987 self.cancel_order(order, None);
988 }
989 }
990
991 fn process_limit_order(&mut self, order: &mut OrderAny) {
992 let limit_px = order.price().expect("Limit order must have a price");
993 if order.is_post_only()
994 && self
995 .core
996 .is_limit_matched(order.order_side_specified(), limit_px)
997 {
998 self.generate_order_rejected(
999 order,
1000 format!(
1001 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
1002 order.order_type(),
1003 order.order_side(),
1004 order.price().unwrap(),
1005 self.core
1006 .bid
1007 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1008 self.core
1009 .ask
1010 .map_or_else(|| "None".to_string(), |p| p.to_string())
1011 )
1012 .into(),
1013 );
1014 return;
1015 }
1016
1017 self.accept_order(order);
1019
1020 if self
1022 .core
1023 .is_limit_matched(order.order_side_specified(), limit_px)
1024 {
1025 if order.liquidity_side().is_some()
1027 && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
1028 {
1029 order.set_liquidity_side(LiquiditySide::Taker);
1030 }
1031 self.fill_limit_order(order);
1032 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
1033 self.cancel_order(order, None);
1034 }
1035 }
1036
1037 fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
1038 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
1040 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
1041 {
1042 self.generate_order_rejected(
1043 order,
1044 format!("No market for {}", order.instrument_id()).into(),
1045 );
1046 return;
1047 }
1048
1049 self.fill_market_order(order);
1051
1052 if order.is_open() {
1053 self.accept_order(order);
1054 }
1055 }
1056
1057 fn process_stop_market_order(&mut self, order: &mut OrderAny) {
1058 let stop_px = order
1059 .trigger_price()
1060 .expect("Stop order must have a trigger price");
1061 if self
1062 .core
1063 .is_stop_matched(order.order_side_specified(), stop_px)
1064 {
1065 if self.config.reject_stop_orders {
1066 self.generate_order_rejected(
1067 order,
1068 format!(
1069 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1070 order.order_type(),
1071 order.order_side(),
1072 order.trigger_price().unwrap(),
1073 self.core
1074 .bid
1075 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1076 self.core
1077 .ask
1078 .map_or_else(|| "None".to_string(), |p| p.to_string())
1079 ).into(),
1080 );
1081 return;
1082 }
1083 self.fill_market_order(order);
1084 return;
1085 }
1086
1087 self.accept_order(order);
1089 }
1090
1091 fn process_stop_market_order_with_protection(&mut self, order: &mut OrderAny) {
1092 let stop_px = order
1093 .trigger_price()
1094 .expect("Stop order must have a trigger price");
1095
1096 let order_side = order.order_side();
1097 let is_ask_initialized = self.core.is_ask_initialized;
1098 let is_bid_initialized = self.core.is_bid_initialized;
1099 if (order_side == OrderSide::Buy && !self.core.is_ask_initialized)
1100 || (order_side == OrderSide::Sell && !self.core.is_bid_initialized)
1101 {
1102 self.generate_order_rejected(
1103 order,
1104 format!("No market for {}", order.instrument_id()).into(),
1105 );
1106 return;
1107 }
1108
1109 self.update_protection_price(order);
1110 let protection_price = order
1111 .price()
1112 .expect("Market order with protection must have a protection price");
1113
1114 if self
1115 .core
1116 .is_stop_matched(order.order_side_specified(), stop_px)
1117 {
1118 if self.config.reject_stop_orders {
1119 self.generate_order_rejected(
1120 order,
1121 format!(
1122 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1123 order.order_type(),
1124 order.order_side(),
1125 order.trigger_price().unwrap(),
1126 self.core
1127 .bid
1128 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1129 self.core
1130 .ask
1131 .map_or_else(|| "None".to_string(), |p| p.to_string())
1132 ).into(),
1133 );
1134 return;
1135 } else {
1136 self.accept_order(order);
1138 }
1139
1140 if self
1141 .core
1142 .is_limit_matched(order.order_side_specified(), protection_price)
1143 {
1144 self.fill_limit_order(order);
1145 }
1146 return;
1147 }
1148 self.accept_order(order);
1150 }
1151
1152 fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
1153 let stop_px = order
1154 .trigger_price()
1155 .expect("Stop order must have a trigger price");
1156 if self
1157 .core
1158 .is_stop_matched(order.order_side_specified(), stop_px)
1159 {
1160 if self.config.reject_stop_orders {
1161 self.generate_order_rejected(
1162 order,
1163 format!(
1164 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1165 order.order_type(),
1166 order.order_side(),
1167 order.trigger_price().unwrap(),
1168 self.core
1169 .bid
1170 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1171 self.core
1172 .ask
1173 .map_or_else(|| "None".to_string(), |p| p.to_string())
1174 ).into(),
1175 );
1176 return;
1177 }
1178
1179 self.accept_order(order);
1180 self.generate_order_triggered(order);
1181
1182 let limit_px = order.price().expect("Stop limit order must have a price");
1184 if self
1185 .core
1186 .is_limit_matched(order.order_side_specified(), limit_px)
1187 {
1188 order.set_liquidity_side(LiquiditySide::Taker);
1189 self.fill_limit_order(order);
1190 }
1191 }
1192
1193 self.accept_order(order);
1195 }
1196
1197 fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
1198 if self
1199 .core
1200 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1201 {
1202 if self.config.reject_stop_orders {
1203 self.generate_order_rejected(
1204 order,
1205 format!(
1206 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1207 order.order_type(),
1208 order.order_side(),
1209 order.trigger_price().unwrap(),
1210 self.core
1211 .bid
1212 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1213 self.core
1214 .ask
1215 .map_or_else(|| "None".to_string(), |p| p.to_string())
1216 ).into(),
1217 );
1218 return;
1219 }
1220 self.fill_market_order(order);
1221 return;
1222 }
1223
1224 self.accept_order(order);
1226 }
1227
1228 fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
1229 if self
1230 .core
1231 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1232 {
1233 if self.config.reject_stop_orders {
1234 self.generate_order_rejected(
1235 order,
1236 format!(
1237 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1238 order.order_type(),
1239 order.order_side(),
1240 order.trigger_price().unwrap(),
1241 self.core
1242 .bid
1243 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1244 self.core
1245 .ask
1246 .map_or_else(|| "None".to_string(), |p| p.to_string())
1247 ).into(),
1248 );
1249 return;
1250 }
1251 self.accept_order(order);
1252 self.generate_order_triggered(order);
1253
1254 if self
1256 .core
1257 .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1258 {
1259 order.set_liquidity_side(LiquiditySide::Taker);
1260 self.fill_limit_order(order);
1261 }
1262 return;
1263 }
1264
1265 self.accept_order(order);
1267 }
1268
1269 fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1270 if let Some(trigger_price) = order.trigger_price()
1271 && self
1272 .core
1273 .is_stop_matched(order.order_side_specified(), trigger_price)
1274 {
1275 self.generate_order_rejected(
1276 order,
1277 format!(
1278 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1279 order.order_type(),
1280 order.order_side(),
1281 trigger_price,
1282 self.core
1283 .bid
1284 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1285 self.core
1286 .ask
1287 .map_or_else(|| "None".to_string(), |p| p.to_string())
1288 ).into(),
1289 );
1290 return;
1291 }
1292
1293 self.accept_order(order);
1295 }
1296
1297 pub fn iterate(&mut self, timestamp_ns: UnixNanos) {
1306 if self.book.has_bid() {
1310 self.core.set_bid_raw(self.book.best_bid_price().unwrap());
1311 }
1312 if self.book.has_ask() {
1313 self.core.set_ask_raw(self.book.best_ask_price().unwrap());
1314 }
1315 self.core.iterate();
1316
1317 self.core.bid = self.book.best_bid_price();
1318 self.core.ask = self.book.best_ask_price();
1319
1320 let orders_bid = self.core.get_orders_bid().to_vec();
1321 let orders_ask = self.core.get_orders_ask().to_vec();
1322
1323 self.iterate_orders(timestamp_ns, &orders_bid);
1324 self.iterate_orders(timestamp_ns, &orders_ask);
1325 }
1326
1327 fn maybe_activate_trailing_stop(
1328 &mut self,
1329 order: &mut OrderAny,
1330 bid: Option<Price>,
1331 ask: Option<Price>,
1332 ) -> bool {
1333 match order {
1334 OrderAny::TrailingStopMarket(inner) => {
1335 if inner.is_activated {
1336 return true;
1337 }
1338
1339 if inner.activation_price.is_none() {
1340 let px = match inner.order_side() {
1341 OrderSide::Buy => ask,
1342 OrderSide::Sell => bid,
1343 _ => None,
1344 };
1345 if let Some(p) = px {
1346 inner.activation_price = Some(p);
1347 inner.set_activated();
1348 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1349 log::error!("Failed to update order: {e}");
1350 }
1351 return true;
1352 }
1353 return false;
1354 }
1355
1356 let activation_price = inner.activation_price.unwrap();
1357 let hit = match inner.order_side() {
1358 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
1359 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
1360 _ => false,
1361 };
1362 if hit {
1363 inner.set_activated();
1364 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1365 log::error!("Failed to update order: {e}");
1366 }
1367 }
1368 hit
1369 }
1370 OrderAny::TrailingStopLimit(inner) => {
1371 if inner.is_activated {
1372 return true;
1373 }
1374
1375 if inner.activation_price.is_none() {
1376 let px = match inner.order_side() {
1377 OrderSide::Buy => ask,
1378 OrderSide::Sell => bid,
1379 _ => None,
1380 };
1381 if let Some(p) = px {
1382 inner.activation_price = Some(p);
1383 inner.set_activated();
1384 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1385 log::error!("Failed to update order: {e}");
1386 }
1387 return true;
1388 }
1389 return false;
1390 }
1391
1392 let activation_price = inner.activation_price.unwrap();
1393 let hit = match inner.order_side() {
1394 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
1395 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
1396 _ => false,
1397 };
1398 if hit {
1399 inner.set_activated();
1400 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1401 log::error!("Failed to update order: {e}");
1402 }
1403 }
1404 hit
1405 }
1406 _ => true,
1407 }
1408 }
1409
1410 fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[PassiveOrderAny]) {
1411 for order in orders {
1412 if order.is_closed() {
1413 continue;
1414 }
1415
1416 if self.config.support_gtd_orders
1417 && order
1418 .expire_time()
1419 .is_some_and(|expire_timestamp_ns| timestamp_ns >= expire_timestamp_ns)
1420 {
1421 let _ = self.core.delete_order(order);
1422 self.cached_filled_qty.remove(&order.client_order_id());
1423 self.expire_order(order);
1424 continue;
1425 }
1426
1427 if matches!(
1428 order,
1429 PassiveOrderAny::Stop(
1430 StopOrderAny::TrailingStopMarket(_) | StopOrderAny::TrailingStopLimit(_)
1431 )
1432 ) {
1433 let mut any = OrderAny::from(order.clone());
1434
1435 if !self.maybe_activate_trailing_stop(&mut any, self.core.bid, self.core.ask) {
1436 continue;
1437 }
1438
1439 self.update_trailing_stop_order(&mut any);
1440 }
1441
1442 if let Some(target_bid) = self.target_bid {
1444 self.core.bid = Some(target_bid);
1445 self.target_bid = None;
1446 }
1447 if let Some(target_bid) = self.target_bid.take() {
1448 self.core.bid = Some(target_bid);
1449 self.target_bid = None;
1450 }
1451 if let Some(target_ask) = self.target_ask.take() {
1452 self.core.ask = Some(target_ask);
1453 self.target_ask = None;
1454 }
1455 if let Some(target_last) = self.target_last.take() {
1456 self.core.last = Some(target_last);
1457 self.target_last = None;
1458 }
1459 }
1460
1461 self.target_bid = None;
1463 self.target_ask = None;
1464 self.target_last = None;
1465 }
1466
1467 fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1468 match order.price() {
1469 Some(order_price) => {
1470 let book_order =
1472 BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
1473
1474 let mut fills = self.book.simulate_fills(&book_order);
1475
1476 if fills.is_empty() {
1478 return fills;
1479 }
1480
1481 if let Some(triggered_price) = order.trigger_price() {
1483 if order
1485 .liquidity_side()
1486 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
1487 {
1488 if order.order_side() == OrderSide::Sell && order_price > triggered_price {
1489 let first_fill = fills.first().unwrap();
1491 let triggered_qty = first_fill.1;
1492 fills[0] = (triggered_price, triggered_qty);
1493 self.target_bid = self.core.bid;
1494 self.target_ask = self.core.ask;
1495 self.target_last = self.core.last;
1496 self.core.set_ask_raw(order_price);
1497 self.core.set_last_raw(order_price);
1498 } else if order.order_side() == OrderSide::Buy
1499 && order_price < triggered_price
1500 {
1501 let first_fill = fills.first().unwrap();
1503 let triggered_qty = first_fill.1;
1504 fills[0] = (triggered_price, triggered_qty);
1505 self.target_bid = self.core.bid;
1506 self.target_ask = self.core.ask;
1507 self.target_last = self.core.last;
1508 self.core.set_bid_raw(order_price);
1509 self.core.set_last_raw(order_price);
1510 }
1511 }
1512 }
1513
1514 if order
1516 .liquidity_side()
1517 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1518 {
1519 match order.order_side().as_specified() {
1520 OrderSideSpecified::Buy => {
1521 let target_price = if order
1522 .trigger_price()
1523 .is_some_and(|trigger_price| order_price > trigger_price)
1524 {
1525 order.trigger_price().unwrap()
1526 } else {
1527 order_price
1528 };
1529 for fill in &fills {
1530 let last_px = fill.0;
1531 if last_px < order_price {
1532 self.target_bid = self.core.bid;
1534 self.target_ask = self.core.ask;
1535 self.target_last = self.core.last;
1536 self.core.set_ask_raw(target_price);
1537 self.core.set_last_raw(target_price);
1538 }
1539 }
1540 }
1541 OrderSideSpecified::Sell => {
1542 let target_price = if order
1543 .trigger_price()
1544 .is_some_and(|trigger_price| order_price < trigger_price)
1545 {
1546 order.trigger_price().unwrap()
1547 } else {
1548 order_price
1549 };
1550 for fill in &fills {
1551 let last_px = fill.0;
1552 if last_px > order_price {
1553 self.target_bid = self.core.bid;
1555 self.target_ask = self.core.ask;
1556 self.target_last = self.core.last;
1557 self.core.set_bid_raw(target_price);
1558 self.core.set_last_raw(target_price);
1559 }
1560 }
1561 }
1562 }
1563 }
1564
1565 fills
1566 }
1567 None => panic!("Limit order must have a price"),
1568 }
1569 }
1570
1571 fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1572 let price = match order.order_side().as_specified() {
1574 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
1575 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
1576 };
1577
1578 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
1580 self.book.simulate_fills(&book_order)
1581 }
1582
1583 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
1584 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
1585 && filled_qty >= &order.quantity()
1586 {
1587 log::info!(
1588 "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
1589 filled_qty,
1590 order.quantity(),
1591 order.filled_qty(),
1592 order.quantity()
1593 );
1594 return;
1595 }
1596
1597 let venue_position_id = self.ids_generator.get_position_id(order, Some(true));
1598 let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
1599 let cache = self.cache.as_ref().borrow();
1600 cache.position(&venue_position_id).cloned()
1601 } else {
1602 None
1603 };
1604
1605 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1606 log::warn!(
1607 "Canceling REDUCE_ONLY {} as would increase position",
1608 order.order_type()
1609 );
1610 self.cancel_order(order, None);
1611 return;
1612 }
1613 order.set_liquidity_side(LiquiditySide::Taker);
1615 let fills = self.determine_market_price_and_volume(order);
1616 self.apply_fills(order, fills, LiquiditySide::Taker, None, position);
1617 }
1618
1619 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
1623 match order.price() {
1624 Some(order_price) => {
1625 let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
1626 if let Some(&qty) = cached_filled_qty
1627 && qty >= order.quantity()
1628 {
1629 log::debug!(
1630 "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
1631 qty,
1632 order.quantity(),
1633 order.filled_qty(),
1634 order.leaves_qty(),
1635 );
1636 return;
1637 }
1638
1639 if order
1640 .liquidity_side()
1641 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1642 {
1643 if order.order_side() == OrderSide::Buy
1644 && self.core.bid.is_some_and(|bid| bid == order_price)
1645 && !self.fill_model.is_limit_filled()
1646 {
1647 return;
1649 }
1650 if order.order_side() == OrderSide::Sell
1651 && self.core.ask.is_some_and(|ask| ask == order_price)
1652 && !self.fill_model.is_limit_filled()
1653 {
1654 return;
1656 }
1657 }
1658
1659 let venue_position_id = self.ids_generator.get_position_id(order, None);
1660 let position = if let Some(venue_position_id) = venue_position_id {
1661 let cache = self.cache.as_ref().borrow();
1662 cache.position(&venue_position_id).cloned()
1663 } else {
1664 None
1665 };
1666
1667 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1668 log::warn!(
1669 "Canceling REDUCE_ONLY {} as would increase position",
1670 order.order_type()
1671 );
1672 self.cancel_order(order, None);
1673 return;
1674 }
1675
1676 let fills = self.determine_limit_price_and_volume(order);
1677
1678 self.apply_fills(
1679 order,
1680 fills,
1681 order.liquidity_side().unwrap(),
1682 venue_position_id,
1683 position,
1684 );
1685 }
1686 None => panic!("Limit order must have a price"),
1687 }
1688 }
1689
1690 fn apply_fills(
1691 &mut self,
1692 order: &mut OrderAny,
1693 fills: Vec<(Price, Quantity)>,
1694 liquidity_side: LiquiditySide,
1695 venue_position_id: Option<PositionId>,
1696 position: Option<Position>,
1697 ) {
1698 if order.time_in_force() == TimeInForce::Fok {
1699 let mut total_size = Quantity::zero(order.quantity().precision);
1700 for (fill_px, fill_qty) in &fills {
1701 total_size = total_size.add(*fill_qty);
1702 }
1703
1704 if order.leaves_qty() > total_size {
1705 self.cancel_order(order, None);
1706 return;
1707 }
1708 }
1709
1710 if fills.is_empty() {
1711 if order.status() == OrderStatus::Submitted {
1712 self.generate_order_rejected(
1713 order,
1714 format!("No market for {}", order.instrument_id()).into(),
1715 );
1716 } else {
1717 log::error!(
1718 "Cannot fill order: no fills from book when fills were expected (check size in data)"
1719 );
1720 return;
1721 }
1722 }
1723
1724 if self.oms_type == OmsType::Netting {
1725 let venue_position_id: Option<PositionId> = None;
1726 }
1727
1728 let mut initial_market_to_limit_fill = false;
1729
1730 for &(mut fill_px, ref fill_qty) in &fills {
1731 assert!(
1732 (fill_px.precision == self.instrument.price_precision()),
1733 "Invalid price precision for fill price {} when instrument price precision is {}.\
1734 Check that the data price precision matches the {} instrument",
1735 fill_px.precision,
1736 self.instrument.price_precision(),
1737 self.instrument.id()
1738 );
1739
1740 assert!(
1741 (fill_qty.precision == self.instrument.size_precision()),
1742 "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
1743 Check that the data quantity precision matches the {} instrument",
1744 fill_qty.precision,
1745 self.instrument.size_precision(),
1746 self.instrument.id()
1747 );
1748
1749 if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
1750 && order.order_type() == OrderType::MarketToLimit
1751 {
1752 self.generate_order_updated(order, order.quantity(), Some(fill_px), None, None);
1753 initial_market_to_limit_fill = true;
1754 }
1755
1756 if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
1757 fill_px = match order.order_side().as_specified() {
1758 OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
1759 OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
1760 }
1761 }
1762
1763 let mut effective_fill_qty = *fill_qty;
1767
1768 if self.config.use_reduce_only
1769 && order.is_reduce_only()
1770 && let Some(position) = &position
1771 && *fill_qty > position.quantity
1772 {
1773 if position.quantity == Quantity::zero(position.quantity.precision) {
1774 return;
1776 }
1777
1778 let adjusted_fill_qty =
1780 Quantity::from_raw(position.quantity.raw, fill_qty.precision);
1781
1782 effective_fill_qty = std::cmp::min(effective_fill_qty, adjusted_fill_qty);
1784
1785 if order.quantity() != adjusted_fill_qty {
1787 self.generate_order_updated(order, adjusted_fill_qty, None, None, None);
1788 }
1789 }
1790
1791 if fill_qty.is_zero() {
1792 if fills.len() == 1 && order.status() == OrderStatus::Submitted {
1793 self.generate_order_rejected(
1794 order,
1795 format!("No market for {}", order.instrument_id()).into(),
1796 );
1797 }
1798 return;
1799 }
1800
1801 self.fill_order(
1802 order,
1803 fill_px,
1804 effective_fill_qty,
1805 liquidity_side,
1806 venue_position_id,
1807 position.clone(),
1808 );
1809
1810 if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
1811 return;
1813 }
1814 }
1815
1816 if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
1817 self.cancel_order(order, None);
1819 return;
1820 }
1821
1822 if order.is_open()
1823 && self.book_type == BookType::L1_MBP
1824 && matches!(
1825 order.order_type(),
1826 OrderType::Market
1827 | OrderType::MarketIfTouched
1828 | OrderType::StopMarket
1829 | OrderType::TrailingStopMarket
1830 )
1831 {
1832 todo!("Exhausted simulated book volume")
1836 }
1837 }
1838
1839 fn fill_order(
1840 &mut self,
1841 order: &mut OrderAny,
1842 last_px: Price,
1843 last_qty: Quantity,
1844 liquidity_side: LiquiditySide,
1845 venue_position_id: Option<PositionId>,
1846 position: Option<Position>,
1847 ) {
1848 let size_prec = self.instrument.size_precision();
1849 let instrument_id = self.instrument.id();
1850 assert!(
1851 last_qty.precision == size_prec,
1852 "Invalid fill quantity precision {}, expected {size_prec} for {instrument_id}",
1853 last_qty.precision
1854 );
1855
1856 match self.cached_filled_qty.get(&order.client_order_id()) {
1857 Some(filled_qty) => {
1858 let leaves_qty = order.quantity().saturating_sub(*filled_qty);
1860 let last_qty = min(last_qty, leaves_qty);
1861 let new_filled_qty = *filled_qty + last_qty;
1862 self.cached_filled_qty
1864 .insert(order.client_order_id(), new_filled_qty);
1865 }
1866 None => {
1867 self.cached_filled_qty
1868 .insert(order.client_order_id(), last_qty);
1869 }
1870 }
1871
1872 let commission = self
1874 .fee_model
1875 .get_commission(order, last_qty, last_px, &self.instrument)
1876 .unwrap();
1877
1878 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1879 self.generate_order_filled(
1880 order,
1881 venue_order_id,
1882 venue_position_id,
1883 last_qty,
1884 last_px,
1885 self.instrument.quote_currency(),
1886 commission,
1887 liquidity_side,
1888 );
1889
1890 if order.is_passive() && order.is_closed() {
1891 if self.core.order_exists(order.client_order_id()) {
1893 let _ = self.core.delete_order(
1894 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
1895 );
1896 }
1897 self.cached_filled_qty.remove(&order.client_order_id());
1898 }
1899
1900 if !self.config.support_contingent_orders {
1901 return;
1902 }
1903
1904 if let Some(contingency_type) = order.contingency_type() {
1905 match contingency_type {
1906 ContingencyType::Oto => {
1907 if let Some(linked_orders_ids) = order.linked_order_ids() {
1908 for client_order_id in linked_orders_ids {
1909 let mut child_order = match self.cache.borrow().order(client_order_id) {
1910 Some(child_order) => child_order.clone(),
1911 None => panic!("Order {client_order_id} not found in cache"),
1912 };
1913
1914 if child_order.is_closed() || child_order.is_active_local() {
1915 continue;
1916 }
1917
1918 if let (None, Some(position_id)) =
1920 (child_order.position_id(), order.position_id())
1921 {
1922 self.cache
1923 .borrow_mut()
1924 .add_position_id(
1925 &position_id,
1926 &self.venue,
1927 client_order_id,
1928 &child_order.strategy_id(),
1929 )
1930 .unwrap();
1931 log::debug!(
1932 "Added position id {position_id} to cache for order {client_order_id}"
1933 );
1934 }
1935
1936 if (!child_order.is_open())
1937 || (matches!(child_order.status(), OrderStatus::PendingUpdate)
1938 && child_order
1939 .previous_status()
1940 .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
1941 {
1942 let account_id = order.account_id().unwrap_or_else(|| {
1943 *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
1944 panic!(
1945 "Account ID not found for trader {}",
1946 order.trader_id()
1947 )
1948 })
1949 });
1950 self.process_order(&mut child_order, account_id);
1951 }
1952 }
1953 } else {
1954 log::error!(
1955 "OTO order {} does not have linked orders",
1956 order.client_order_id()
1957 );
1958 }
1959 }
1960 ContingencyType::Oco => {
1961 if let Some(linked_orders_ids) = order.linked_order_ids() {
1962 for client_order_id in linked_orders_ids {
1963 let child_order = match self.cache.borrow().order(client_order_id) {
1964 Some(child_order) => child_order.clone(),
1965 None => panic!("Order {client_order_id} not found in cache"),
1966 };
1967
1968 if child_order.is_closed() || child_order.is_active_local() {
1969 continue;
1970 }
1971
1972 self.cancel_order(&child_order, None);
1973 }
1974 } else {
1975 log::error!(
1976 "OCO order {} does not have linked orders",
1977 order.client_order_id()
1978 );
1979 }
1980 }
1981 ContingencyType::Ouo => {
1982 if let Some(linked_orders_ids) = order.linked_order_ids() {
1983 for client_order_id in linked_orders_ids {
1984 let mut child_order = match self.cache.borrow().order(client_order_id) {
1985 Some(child_order) => child_order.clone(),
1986 None => panic!("Order {client_order_id} not found in cache"),
1987 };
1988
1989 if child_order.is_active_local() {
1990 continue;
1991 }
1992
1993 if order.is_closed() && child_order.is_open() {
1994 self.cancel_order(&child_order, None);
1995 } else if !order.leaves_qty().is_zero()
1996 && order.leaves_qty() != child_order.leaves_qty()
1997 {
1998 let price = child_order.price();
1999 let trigger_price = child_order.trigger_price();
2000 self.update_order(
2001 &mut child_order,
2002 Some(order.leaves_qty()),
2003 price,
2004 trigger_price,
2005 Some(false),
2006 );
2007 }
2008 }
2009 } else {
2010 log::error!(
2011 "OUO order {} does not have linked orders",
2012 order.client_order_id()
2013 );
2014 }
2015 }
2016 _ => {}
2017 }
2018 }
2019 }
2020
2021 fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
2022 if self
2023 .core
2024 .is_limit_matched(order.order_side_specified(), price)
2025 {
2026 if order.is_post_only() {
2027 self.generate_order_modify_rejected(
2028 order.trader_id(),
2029 order.strategy_id(),
2030 order.instrument_id(),
2031 order.client_order_id(),
2032 Ustr::from(format!(
2033 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
2034 order.order_type(),
2035 order.order_side(),
2036 price,
2037 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
2038 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
2039 ).as_str()),
2040 order.venue_order_id(),
2041 order.account_id(),
2042 );
2043 return;
2044 }
2045
2046 self.generate_order_updated(order, quantity, Some(price), None, None);
2047 order.set_liquidity_side(LiquiditySide::Taker);
2048 self.fill_limit_order(order);
2049 return;
2050 }
2051 self.generate_order_updated(order, quantity, Some(price), None, None);
2052 }
2053
2054 fn update_stop_market_order(
2055 &mut self,
2056 order: &mut OrderAny,
2057 quantity: Quantity,
2058 trigger_price: Price,
2059 ) {
2060 if self
2061 .core
2062 .is_stop_matched(order.order_side_specified(), trigger_price)
2063 {
2064 self.generate_order_modify_rejected(
2065 order.trader_id(),
2066 order.strategy_id(),
2067 order.instrument_id(),
2068 order.client_order_id(),
2069 Ustr::from(
2070 format!(
2071 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
2072 order.order_type(),
2073 order.order_side(),
2074 trigger_price,
2075 self.core
2076 .bid
2077 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2078 self.core
2079 .ask
2080 .map_or_else(|| "None".to_string(), |p| p.to_string())
2081 )
2082 .as_str(),
2083 ),
2084 order.venue_order_id(),
2085 order.account_id(),
2086 );
2087 return;
2088 }
2089
2090 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
2091 }
2092
2093 fn update_stop_limit_order(
2094 &mut self,
2095 order: &mut OrderAny,
2096 quantity: Quantity,
2097 price: Price,
2098 trigger_price: Price,
2099 ) {
2100 if order.is_triggered().is_some_and(|t| t) {
2101 if self
2103 .core
2104 .is_limit_matched(order.order_side_specified(), price)
2105 {
2106 if order.is_post_only() {
2107 self.generate_order_modify_rejected(
2108 order.trader_id(),
2109 order.strategy_id(),
2110 order.instrument_id(),
2111 order.client_order_id(),
2112 Ustr::from(format!(
2113 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
2114 order.order_type(),
2115 order.order_side(),
2116 price,
2117 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
2118 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
2119 ).as_str()),
2120 order.venue_order_id(),
2121 order.account_id(),
2122 );
2123 return;
2124 }
2125 self.generate_order_updated(order, quantity, Some(price), None, None);
2126 order.set_liquidity_side(LiquiditySide::Taker);
2127 self.fill_limit_order(order);
2128 return; }
2130 } else {
2131 if self
2133 .core
2134 .is_stop_matched(order.order_side_specified(), trigger_price)
2135 {
2136 self.generate_order_modify_rejected(
2137 order.trader_id(),
2138 order.strategy_id(),
2139 order.instrument_id(),
2140 order.client_order_id(),
2141 Ustr::from(
2142 format!(
2143 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
2144 order.order_type(),
2145 order.order_side(),
2146 trigger_price,
2147 self.core
2148 .bid
2149 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2150 self.core
2151 .ask
2152 .map_or_else(|| "None".to_string(), |p| p.to_string())
2153 )
2154 .as_str(),
2155 ),
2156 order.venue_order_id(),
2157 order.account_id(),
2158 );
2159 return;
2160 }
2161 }
2162
2163 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
2164 }
2165
2166 fn update_market_if_touched_order(
2167 &mut self,
2168 order: &mut OrderAny,
2169 quantity: Quantity,
2170 trigger_price: Price,
2171 ) {
2172 if self
2173 .core
2174 .is_touch_triggered(order.order_side_specified(), trigger_price)
2175 {
2176 self.generate_order_modify_rejected(
2177 order.trader_id(),
2178 order.strategy_id(),
2179 order.instrument_id(),
2180 order.client_order_id(),
2181 Ustr::from(
2182 format!(
2183 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
2184 order.order_type(),
2185 order.order_side(),
2186 trigger_price,
2187 self.core
2188 .bid
2189 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2190 self.core
2191 .ask
2192 .map_or_else(|| "None".to_string(), |p| p.to_string())
2193 )
2194 .as_str(),
2195 ),
2196 order.venue_order_id(),
2197 order.account_id(),
2198 );
2199 return;
2201 }
2202
2203 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
2204 }
2205
2206 fn update_limit_if_touched_order(
2207 &mut self,
2208 order: &mut OrderAny,
2209 quantity: Quantity,
2210 price: Price,
2211 trigger_price: Price,
2212 ) {
2213 if order.is_triggered().is_some_and(|t| t) {
2214 if self
2216 .core
2217 .is_limit_matched(order.order_side_specified(), price)
2218 {
2219 if order.is_post_only() {
2220 self.generate_order_modify_rejected(
2221 order.trader_id(),
2222 order.strategy_id(),
2223 order.instrument_id(),
2224 order.client_order_id(),
2225 Ustr::from(format!(
2226 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
2227 order.order_type(),
2228 order.order_side(),
2229 price,
2230 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
2231 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
2232 ).as_str()),
2233 order.venue_order_id(),
2234 order.account_id(),
2235 );
2236 return;
2238 }
2239 self.generate_order_updated(order, quantity, Some(price), None, None);
2240 order.set_liquidity_side(LiquiditySide::Taker);
2241 self.fill_limit_order(order);
2242 return;
2243 }
2244 } else {
2245 if self
2247 .core
2248 .is_touch_triggered(order.order_side_specified(), trigger_price)
2249 {
2250 self.generate_order_modify_rejected(
2251 order.trader_id(),
2252 order.strategy_id(),
2253 order.instrument_id(),
2254 order.client_order_id(),
2255 Ustr::from(
2256 format!(
2257 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
2258 order.order_type(),
2259 order.order_side(),
2260 trigger_price,
2261 self.core
2262 .bid
2263 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2264 self.core
2265 .ask
2266 .map_or_else(|| "None".to_string(), |p| p.to_string())
2267 )
2268 .as_str(),
2269 ),
2270 order.venue_order_id(),
2271 order.account_id(),
2272 );
2273 return;
2274 }
2275 }
2276
2277 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
2278 }
2279
2280 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
2281 let (new_trigger_price, new_price) = trailing_stop_calculate(
2282 self.instrument.price_increment(),
2283 order.trigger_price(),
2284 order.activation_price(),
2285 order,
2286 self.core.bid,
2287 self.core.ask,
2288 self.core.last,
2289 )
2290 .unwrap();
2291
2292 if new_trigger_price.is_none() && new_price.is_none() {
2293 return;
2294 }
2295
2296 self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price, None);
2297 }
2298
2299 fn update_protection_price(&mut self, order: &mut OrderAny) {
2300 let protection_price = protection_price_calculate(
2301 self.instrument.price_increment(),
2302 order,
2303 self.config.price_protection_points,
2304 self.core.bid,
2305 self.core.ask,
2306 );
2307
2308 if let Ok(protection_price) = protection_price {
2309 self.generate_order_updated(
2310 order,
2311 order.quantity(),
2312 None,
2313 None,
2314 Some(protection_price),
2315 );
2316 }
2317 }
2318
2319 fn accept_order(&mut self, order: &mut OrderAny) {
2322 if order.is_closed() {
2323 return;
2325 }
2326 if order.status() != OrderStatus::Accepted {
2327 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2328 self.generate_order_accepted(order, venue_order_id);
2329
2330 if matches!(
2331 order.order_type(),
2332 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
2333 ) && order.trigger_price().is_none()
2334 {
2335 self.update_trailing_stop_order(order);
2336 }
2337 }
2338
2339 let _ = self.core.add_order(
2340 PassiveOrderAny::try_from(order.to_owned()).expect("passive order conversion"),
2341 );
2342 }
2343
2344 fn expire_order(&mut self, order: &PassiveOrderAny) {
2345 if self.config.support_contingent_orders
2346 && order
2347 .contingency_type()
2348 .is_some_and(|c| c != ContingencyType::NoContingency)
2349 {
2350 self.cancel_contingent_orders(&OrderAny::from(order.clone()));
2351 }
2352
2353 self.generate_order_expired(&order.to_any());
2354 }
2355
2356 fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
2357 let cancel_contingencies = cancel_contingencies.unwrap_or(true);
2358 if order.is_active_local() {
2359 log::error!(
2360 "Cannot cancel an order with {} from the matching engine",
2361 order.status()
2362 );
2363 return;
2364 }
2365
2366 if self.core.order_exists(order.client_order_id()) {
2368 let _ = self.core.delete_order(
2369 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
2370 );
2371 }
2372 self.cached_filled_qty.remove(&order.client_order_id());
2373
2374 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2375 self.generate_order_canceled(order, venue_order_id);
2376
2377 if self.config.support_contingent_orders
2378 && order.contingency_type().is_some()
2379 && order.contingency_type().unwrap() != ContingencyType::NoContingency
2380 && cancel_contingencies
2381 {
2382 self.cancel_contingent_orders(order);
2383 }
2384 }
2385
2386 fn update_order(
2387 &mut self,
2388 order: &mut OrderAny,
2389 quantity: Option<Quantity>,
2390 price: Option<Price>,
2391 trigger_price: Option<Price>,
2392 update_contingencies: Option<bool>,
2393 ) {
2394 let update_contingencies = update_contingencies.unwrap_or(true);
2395 let quantity = quantity.unwrap_or(order.quantity());
2396
2397 let price_prec = self.instrument.price_precision();
2398 let size_prec = self.instrument.size_precision();
2399 let instrument_id = self.instrument.id();
2400 if quantity.precision != size_prec {
2401 self.generate_order_modify_rejected(
2402 order.trader_id(),
2403 order.strategy_id(),
2404 order.instrument_id(),
2405 order.client_order_id(),
2406 Ustr::from(&format!(
2407 "Invalid update quantity precision {}, expected {size_prec} for {instrument_id}",
2408 quantity.precision
2409 )),
2410 order.venue_order_id(),
2411 order.account_id(),
2412 );
2413 return;
2414 }
2415 if let Some(px) = price
2416 && px.precision != price_prec
2417 {
2418 self.generate_order_modify_rejected(
2419 order.trader_id(),
2420 order.strategy_id(),
2421 order.instrument_id(),
2422 order.client_order_id(),
2423 Ustr::from(&format!(
2424 "Invalid update price precision {}, expected {price_prec} for {instrument_id}",
2425 px.precision
2426 )),
2427 order.venue_order_id(),
2428 order.account_id(),
2429 );
2430 return;
2431 }
2432 if let Some(tp) = trigger_price
2433 && tp.precision != price_prec
2434 {
2435 self.generate_order_modify_rejected(
2436 order.trader_id(),
2437 order.strategy_id(),
2438 order.instrument_id(),
2439 order.client_order_id(),
2440 Ustr::from(&format!(
2441 "Invalid update trigger_price precision {}, expected {price_prec} for {instrument_id}",
2442 tp.precision
2443 )),
2444 order.venue_order_id(),
2445 order.account_id(),
2446 );
2447 return;
2448 }
2449
2450 let filled_qty = self
2452 .cached_filled_qty
2453 .get(&order.client_order_id())
2454 .copied()
2455 .unwrap_or(order.filled_qty());
2456 if quantity < filled_qty {
2457 self.generate_order_modify_rejected(
2458 order.trader_id(),
2459 order.strategy_id(),
2460 order.instrument_id(),
2461 order.client_order_id(),
2462 Ustr::from(&format!(
2463 "Cannot reduce order quantity {quantity} below filled quantity {filled_qty}",
2464 )),
2465 order.venue_order_id(),
2466 order.account_id(),
2467 );
2468 return;
2469 }
2470
2471 match order {
2472 OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
2473 let price = price.unwrap_or(order.price().unwrap());
2474 self.update_limit_order(order, quantity, price);
2475 }
2476 OrderAny::StopMarket(_) => {
2477 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2478 self.update_stop_market_order(order, quantity, trigger_price);
2479 }
2480 OrderAny::StopLimit(_) => {
2481 let price = price.unwrap_or(order.price().unwrap());
2482 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2483 self.update_stop_limit_order(order, quantity, price, trigger_price);
2484 }
2485 OrderAny::MarketIfTouched(_) => {
2486 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2487 self.update_market_if_touched_order(order, quantity, trigger_price);
2488 }
2489 OrderAny::LimitIfTouched(_) => {
2490 let price = price.unwrap_or(order.price().unwrap());
2491 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2492 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2493 }
2494 OrderAny::TrailingStopMarket(_) => {
2495 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2496 self.update_market_if_touched_order(order, quantity, trigger_price);
2497 }
2498 OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
2499 let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
2500 let trigger_price =
2501 trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
2502 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2503 }
2504 _ => {
2505 panic!(
2506 "Unsupported order type {} for update_order",
2507 order.order_type()
2508 );
2509 }
2510 }
2511
2512 let new_leaves_qty = quantity.saturating_sub(filled_qty);
2514 if new_leaves_qty.is_zero() {
2515 if self.config.support_contingent_orders
2516 && order
2517 .contingency_type()
2518 .is_some_and(|c| c != ContingencyType::NoContingency)
2519 && update_contingencies
2520 {
2521 self.update_contingent_order(order);
2522 }
2523 self.cancel_order(order, Some(false));
2525 return;
2526 }
2527
2528 if self.config.support_contingent_orders
2529 && order
2530 .contingency_type()
2531 .is_some_and(|c| c != ContingencyType::NoContingency)
2532 && update_contingencies
2533 {
2534 self.update_contingent_order(order);
2535 }
2536 }
2537
2538 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
2539 todo!("trigger_stop_order")
2540 }
2541
2542 fn update_contingent_order(&mut self, order: &OrderAny) {
2543 log::debug!("Updating OUO orders from {}", order.client_order_id());
2544 if let Some(linked_order_ids) = order.linked_order_ids() {
2545 let parent_filled_qty = self
2546 .cached_filled_qty
2547 .get(&order.client_order_id())
2548 .copied()
2549 .unwrap_or(order.filled_qty());
2550 let parent_leaves_qty = order.quantity().saturating_sub(parent_filled_qty);
2551
2552 for client_order_id in linked_order_ids {
2553 let mut child_order = match self.cache.borrow().order(client_order_id) {
2554 Some(order) => order.clone(),
2555 None => panic!("Order {client_order_id} not found in cache."),
2556 };
2557
2558 if child_order.is_active_local() {
2559 continue;
2560 }
2561
2562 let child_filled_qty = self
2563 .cached_filled_qty
2564 .get(&child_order.client_order_id())
2565 .copied()
2566 .unwrap_or(child_order.filled_qty());
2567
2568 if parent_leaves_qty.is_zero() {
2569 self.cancel_order(&child_order, Some(false));
2570 } else if child_filled_qty >= parent_leaves_qty {
2571 self.cancel_order(&child_order, Some(false));
2573 } else {
2574 let child_leaves_qty = child_order.quantity().saturating_sub(child_filled_qty);
2575 if child_leaves_qty != parent_leaves_qty {
2576 let price = child_order.price();
2577 let trigger_price = child_order.trigger_price();
2578 self.update_order(
2579 &mut child_order,
2580 Some(parent_leaves_qty),
2581 price,
2582 trigger_price,
2583 Some(false),
2584 );
2585 }
2586 }
2587 }
2588 }
2589 }
2590
2591 fn cancel_contingent_orders(&mut self, order: &OrderAny) {
2592 if let Some(linked_order_ids) = order.linked_order_ids() {
2593 for client_order_id in linked_order_ids {
2594 let contingent_order = match self.cache.borrow().order(client_order_id) {
2595 Some(order) => order.clone(),
2596 None => panic!("Cannot find contingent order for {client_order_id}"),
2597 };
2598 if contingent_order.is_active_local() {
2599 continue;
2601 }
2602 if !contingent_order.is_closed() {
2603 self.cancel_order(&contingent_order, Some(false));
2604 }
2605 }
2606 }
2607 }
2608
2609 fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
2612 let ts_now = self.clock.borrow().timestamp_ns();
2613 let account_id = order
2614 .account_id()
2615 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2616
2617 let due_post_only = reason.as_str().starts_with("POST_ONLY");
2619
2620 let event = OrderEventAny::Rejected(OrderRejected::new(
2621 order.trader_id(),
2622 order.strategy_id(),
2623 order.instrument_id(),
2624 order.client_order_id(),
2625 account_id,
2626 reason,
2627 UUID4::new(),
2628 ts_now,
2629 ts_now,
2630 false,
2631 due_post_only,
2632 ));
2633 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2634 }
2635
2636 fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
2637 let ts_now = self.clock.borrow().timestamp_ns();
2638 let account_id = order
2639 .account_id()
2640 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2641 let event = OrderEventAny::Accepted(OrderAccepted::new(
2642 order.trader_id(),
2643 order.strategy_id(),
2644 order.instrument_id(),
2645 order.client_order_id(),
2646 venue_order_id,
2647 account_id,
2648 UUID4::new(),
2649 ts_now,
2650 ts_now,
2651 false,
2652 ));
2653 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2654
2655 order.apply(event).expect("Failed to apply order event");
2657 }
2658
2659 #[allow(clippy::too_many_arguments)]
2660 fn generate_order_modify_rejected(
2661 &self,
2662 trader_id: TraderId,
2663 strategy_id: StrategyId,
2664 instrument_id: InstrumentId,
2665 client_order_id: ClientOrderId,
2666 reason: Ustr,
2667 venue_order_id: Option<VenueOrderId>,
2668 account_id: Option<AccountId>,
2669 ) {
2670 let ts_now = self.clock.borrow().timestamp_ns();
2671 let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
2672 trader_id,
2673 strategy_id,
2674 instrument_id,
2675 client_order_id,
2676 reason,
2677 UUID4::new(),
2678 ts_now,
2679 ts_now,
2680 false,
2681 venue_order_id,
2682 account_id,
2683 ));
2684 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2685 }
2686
2687 #[allow(clippy::too_many_arguments)]
2688 fn generate_order_cancel_rejected(
2689 &self,
2690 trader_id: TraderId,
2691 strategy_id: StrategyId,
2692 account_id: AccountId,
2693 instrument_id: InstrumentId,
2694 client_order_id: ClientOrderId,
2695 venue_order_id: VenueOrderId,
2696 reason: Ustr,
2697 ) {
2698 let ts_now = self.clock.borrow().timestamp_ns();
2699 let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
2700 trader_id,
2701 strategy_id,
2702 instrument_id,
2703 client_order_id,
2704 reason,
2705 UUID4::new(),
2706 ts_now,
2707 ts_now,
2708 false,
2709 Some(venue_order_id),
2710 Some(account_id),
2711 ));
2712 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2713 }
2714
2715 fn generate_order_updated(
2716 &self,
2717 order: &mut OrderAny,
2718 quantity: Quantity,
2719 price: Option<Price>,
2720 trigger_price: Option<Price>,
2721 protection_price: Option<Price>,
2722 ) {
2723 let ts_now = self.clock.borrow().timestamp_ns();
2724 let event = OrderEventAny::Updated(OrderUpdated::new(
2725 order.trader_id(),
2726 order.strategy_id(),
2727 order.instrument_id(),
2728 order.client_order_id(),
2729 quantity,
2730 UUID4::new(),
2731 ts_now,
2732 ts_now,
2733 false,
2734 order.venue_order_id(),
2735 order.account_id(),
2736 price,
2737 trigger_price,
2738 protection_price,
2739 ));
2740 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2741
2742 order.apply(event).expect("Failed to apply order event");
2744 }
2745
2746 fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
2747 let ts_now = self.clock.borrow().timestamp_ns();
2748 let event = OrderEventAny::Canceled(OrderCanceled::new(
2749 order.trader_id(),
2750 order.strategy_id(),
2751 order.instrument_id(),
2752 order.client_order_id(),
2753 UUID4::new(),
2754 ts_now,
2755 ts_now,
2756 false,
2757 Some(venue_order_id),
2758 order.account_id(),
2759 ));
2760 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2761 }
2762
2763 fn generate_order_triggered(&self, order: &OrderAny) {
2764 let ts_now = self.clock.borrow().timestamp_ns();
2765 let event = OrderEventAny::Triggered(OrderTriggered::new(
2766 order.trader_id(),
2767 order.strategy_id(),
2768 order.instrument_id(),
2769 order.client_order_id(),
2770 UUID4::new(),
2771 ts_now,
2772 ts_now,
2773 false,
2774 order.venue_order_id(),
2775 order.account_id(),
2776 ));
2777 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2778 }
2779
2780 fn generate_order_expired(&self, order: &OrderAny) {
2781 let ts_now = self.clock.borrow().timestamp_ns();
2782 let event = OrderEventAny::Expired(OrderExpired::new(
2783 order.trader_id(),
2784 order.strategy_id(),
2785 order.instrument_id(),
2786 order.client_order_id(),
2787 UUID4::new(),
2788 ts_now,
2789 ts_now,
2790 false,
2791 order.venue_order_id(),
2792 order.account_id(),
2793 ));
2794 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2795 }
2796
2797 #[allow(clippy::too_many_arguments)]
2798 fn generate_order_filled(
2799 &mut self,
2800 order: &mut OrderAny,
2801 venue_order_id: VenueOrderId,
2802 venue_position_id: Option<PositionId>,
2803 last_qty: Quantity,
2804 last_px: Price,
2805 quote_currency: Currency,
2806 commission: Money,
2807 liquidity_side: LiquiditySide,
2808 ) {
2809 let ts_now = self.clock.borrow().timestamp_ns();
2810 let account_id = order
2811 .account_id()
2812 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2813 let event = OrderEventAny::Filled(OrderFilled::new(
2814 order.trader_id(),
2815 order.strategy_id(),
2816 order.instrument_id(),
2817 order.client_order_id(),
2818 venue_order_id,
2819 account_id,
2820 self.ids_generator.generate_trade_id(),
2821 order.order_side(),
2822 order.order_type(),
2823 last_qty,
2824 last_px,
2825 quote_currency,
2826 liquidity_side,
2827 UUID4::new(),
2828 ts_now,
2829 ts_now,
2830 false,
2831 venue_position_id,
2832 Some(commission),
2833 ));
2834 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2835
2836 order.apply(event).expect("Failed to apply order event");
2838 }
2839}