1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::RefCell,
23 cmp::min,
24 fmt::Debug,
25 ops::{Add, Sub},
26 rc::Rc,
27};
28
29use ahash::AHashMap;
30use chrono::TimeDelta;
31use nautilus_common::{
32 cache::Cache,
33 clock::Clock,
34 messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
35 msgbus,
36};
37use nautilus_core::{UUID4, UnixNanos};
38use nautilus_model::{
39 data::{Bar, BarType, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, order::BookOrder},
40 enums::{
41 AccountType, AggregationSource, AggressorSide, BookAction, BookType, ContingencyType,
42 LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide, OrderSideSpecified,
43 OrderStatus, OrderType, PriceType, TimeInForce,
44 },
45 events::{
46 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
47 OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
48 },
49 identifiers::{
50 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
51 VenueOrderId,
52 },
53 instruments::{Instrument, InstrumentAny},
54 orderbook::OrderBook,
55 orders::{Order, OrderAny, PassiveOrderAny, StopOrderAny},
56 position::Position,
57 types::{
58 Currency, Money, Price, Quantity, fixed::FIXED_PRECISION, price::PriceRaw,
59 quantity::QuantityRaw,
60 },
61};
62use ustr::Ustr;
63
64use crate::{
65 matching_core::OrderMatchingCore,
66 matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
67 models::{
68 fee::{FeeModel, FeeModelAny},
69 fill::FillModel,
70 },
71 protection::protection_price_calculate,
72 trailing::trailing_stop_calculate,
73};
74
75pub struct OrderMatchingEngine {
77 pub venue: Venue,
79 pub instrument: InstrumentAny,
81 pub raw_id: u32,
83 pub book_type: BookType,
85 pub oms_type: OmsType,
87 pub account_type: AccountType,
89 pub market_status: MarketStatus,
91 pub config: OrderMatchingEngineConfig,
93 clock: Rc<RefCell<dyn Clock>>,
94 cache: Rc<RefCell<Cache>>,
95 book: OrderBook,
96 pub core: OrderMatchingCore,
97 fill_model: FillModel,
98 fee_model: FeeModelAny,
99 target_bid: Option<Price>,
100 target_ask: Option<Price>,
101 target_last: Option<Price>,
102 last_bar_bid: Option<Bar>,
103 last_bar_ask: Option<Bar>,
104 execution_bar_types: AHashMap<InstrumentId, BarType>,
105 execution_bar_deltas: AHashMap<BarType, TimeDelta>,
106 account_ids: AHashMap<TraderId, AccountId>,
107 cached_filled_qty: AHashMap<ClientOrderId, Quantity>,
108 ids_generator: IdsGenerator,
109 last_trade_size: Option<Quantity>,
110 bid_consumption: AHashMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
111 ask_consumption: AHashMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
112 trade_consumption: QuantityRaw,
113}
114
115impl Debug for OrderMatchingEngine {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 f.debug_struct(stringify!(OrderMatchingEngine))
118 .field("venue", &self.venue)
119 .field("instrument", &self.instrument.id())
120 .finish()
121 }
122}
123
124impl OrderMatchingEngine {
125 #[allow(clippy::too_many_arguments)]
127 pub fn new(
128 instrument: InstrumentAny,
129 raw_id: u32,
130 fill_model: FillModel,
131 fee_model: FeeModelAny,
132 book_type: BookType,
133 oms_type: OmsType,
134 account_type: AccountType,
135 clock: Rc<RefCell<dyn Clock>>,
136 cache: Rc<RefCell<Cache>>,
137 config: OrderMatchingEngineConfig,
138 ) -> Self {
139 let book = OrderBook::new(instrument.id(), book_type);
140 let core = OrderMatchingCore::new(
141 instrument.id(),
142 instrument.price_increment(),
143 None, None, None, );
147 let ids_generator = IdsGenerator::new(
148 instrument.id().venue,
149 oms_type,
150 raw_id,
151 config.use_random_ids,
152 config.use_position_ids,
153 cache.clone(),
154 );
155
156 Self {
157 venue: instrument.id().venue,
158 instrument,
159 raw_id,
160 fill_model,
161 fee_model,
162 book_type,
163 oms_type,
164 account_type,
165 clock,
166 cache,
167 book,
168 core,
169 market_status: MarketStatus::Open,
170 config,
171 target_bid: None,
172 target_ask: None,
173 target_last: None,
174 last_bar_bid: None,
175 last_bar_ask: None,
176 execution_bar_types: AHashMap::new(),
177 execution_bar_deltas: AHashMap::new(),
178 account_ids: AHashMap::new(),
179 cached_filled_qty: AHashMap::new(),
180 ids_generator,
181 last_trade_size: None,
182 bid_consumption: AHashMap::new(),
183 ask_consumption: AHashMap::new(),
184 trade_consumption: 0,
185 }
186 }
187
188 pub fn reset(&mut self) {
194 self.book.clear(0, UnixNanos::default());
195 self.execution_bar_types.clear();
196 self.execution_bar_deltas.clear();
197 self.account_ids.clear();
198 self.cached_filled_qty.clear();
199 self.core.reset();
200 self.target_bid = None;
201 self.target_ask = None;
202 self.target_last = None;
203 self.last_trade_size = None;
204 self.bid_consumption.clear();
205 self.ask_consumption.clear();
206 self.trade_consumption = 0;
207 self.ids_generator.reset();
208
209 log::info!("Reset {}", self.instrument.id());
210 }
211
212 fn apply_liquidity_consumption(
213 &mut self,
214 fills: Vec<(Price, Quantity)>,
215 order_side: OrderSide,
216 leaves_qty: Quantity,
217 ) -> Vec<(Price, Quantity)> {
218 if !self.config.liquidity_consumption {
219 return fills;
220 }
221
222 let consumption = match order_side {
223 OrderSide::Buy => &mut self.ask_consumption,
224 OrderSide::Sell => &mut self.bid_consumption,
225 _ => return fills,
226 };
227
228 let mut adjusted_fills = Vec::with_capacity(fills.len());
229 let mut remaining_qty = leaves_qty.raw;
230
231 for (price, qty) in fills {
232 if remaining_qty == 0 {
233 break;
234 }
235
236 let price_raw = price.raw;
237 let book_size_f64 = self.book.get_quantity_for_price(price, order_side);
238 let book_size_raw = (book_size_f64 * 10f64.powi(FIXED_PRECISION as i32)) as QuantityRaw;
239
240 let (original_size, consumed) =
241 consumption.entry(price_raw).or_insert((book_size_raw, 0));
242
243 if *original_size != book_size_raw {
245 *original_size = book_size_raw;
246 *consumed = 0;
247 }
248
249 let available = original_size.saturating_sub(*consumed);
250 if available == 0 {
251 continue;
252 }
253
254 let adjusted_qty_raw = min(min(qty.raw, available), remaining_qty);
255 if adjusted_qty_raw == 0 {
256 continue;
257 }
258
259 *consumed += adjusted_qty_raw;
260 remaining_qty -= adjusted_qty_raw;
261
262 let adjusted_qty = Quantity::from_raw(adjusted_qty_raw, qty.precision);
263 adjusted_fills.push((price, adjusted_qty));
264 }
265
266 adjusted_fills
267 }
268
269 pub const fn set_fill_model(&mut self, fill_model: FillModel) {
271 self.fill_model = fill_model;
272 }
273
274 #[must_use]
275 pub fn best_bid_price(&self) -> Option<Price> {
277 self.book.best_bid_price()
278 }
279
280 #[must_use]
281 pub fn best_ask_price(&self) -> Option<Price> {
283 self.book.best_ask_price()
284 }
285
286 #[must_use]
287 pub const fn get_book(&self) -> &OrderBook {
289 &self.book
290 }
291
292 #[must_use]
293 pub const fn get_open_bid_orders(&self) -> &[PassiveOrderAny] {
295 self.core.get_orders_bid()
296 }
297
298 #[must_use]
299 pub const fn get_open_ask_orders(&self) -> &[PassiveOrderAny] {
301 self.core.get_orders_ask()
302 }
303
304 #[must_use]
305 pub fn get_open_orders(&self) -> Vec<PassiveOrderAny> {
307 let mut orders = Vec::new();
309 orders.extend_from_slice(self.core.get_orders_bid());
310 orders.extend_from_slice(self.core.get_orders_ask());
311 orders
312 }
313
314 #[must_use]
315 pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
317 self.core.order_exists(client_order_id)
318 }
319
320 pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) -> anyhow::Result<()> {
330 log::debug!("Processing {delta}");
331
332 if matches!(delta.action, BookAction::Add | BookAction::Update) {
334 let price_prec = self.instrument.price_precision();
335 let size_prec = self.instrument.size_precision();
336 let instrument_id = self.instrument.id();
337
338 if delta.order.price.precision != price_prec {
339 anyhow::bail!(
340 "Invalid delta order price precision {prec}, expected {price_prec} for {instrument_id}",
341 prec = delta.order.price.precision
342 );
343 }
344 if delta.order.size.precision != size_prec {
345 anyhow::bail!(
346 "Invalid delta order size precision {prec}, expected {size_prec} for {instrument_id}",
347 prec = delta.order.size.precision
348 );
349 }
350 }
351
352 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
353 self.book.apply_delta(delta)?;
354 }
355
356 self.iterate(delta.ts_init, AggressorSide::NoAggressor);
357 Ok(())
358 }
359
360 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
368 log::debug!("Processing {deltas}");
369
370 let price_prec = self.instrument.price_precision();
372 let size_prec = self.instrument.size_precision();
373 let instrument_id = self.instrument.id();
374
375 for delta in &deltas.deltas {
376 if matches!(delta.action, BookAction::Add | BookAction::Update) {
377 if delta.order.price.precision != price_prec {
378 anyhow::bail!(
379 "Invalid delta order price precision {prec}, expected {price_prec} for {instrument_id}",
380 prec = delta.order.price.precision
381 );
382 }
383 if delta.order.size.precision != size_prec {
384 anyhow::bail!(
385 "Invalid delta order size precision {prec}, expected {size_prec} for {instrument_id}",
386 prec = delta.order.size.precision
387 );
388 }
389 }
390 }
391
392 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
393 self.book.apply_deltas(deltas)?;
394 }
395
396 self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
397 Ok(())
398 }
399
400 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
406 log::debug!("Processing {quote}");
407
408 let price_prec = self.instrument.price_precision();
409 let size_prec = self.instrument.size_precision();
410 let instrument_id = self.instrument.id();
411
412 assert!(
413 quote.bid_price.precision == price_prec,
414 "Invalid bid_price precision {}, expected {price_prec} for {instrument_id}",
415 quote.bid_price.precision
416 );
417 assert!(
418 quote.ask_price.precision == price_prec,
419 "Invalid ask_price precision {}, expected {price_prec} for {instrument_id}",
420 quote.ask_price.precision
421 );
422 assert!(
423 quote.bid_size.precision == size_prec,
424 "Invalid bid_size precision {}, expected {size_prec} for {instrument_id}",
425 quote.bid_size.precision
426 );
427 assert!(
428 quote.ask_size.precision == size_prec,
429 "Invalid ask_size precision {}, expected {size_prec} for {instrument_id}",
430 quote.ask_size.precision
431 );
432
433 if self.book_type == BookType::L1_MBP {
434 self.book.update_quote_tick(quote).unwrap();
435 }
436
437 self.iterate(quote.ts_init, AggressorSide::NoAggressor);
438 }
439
440 pub fn process_bar(&mut self, bar: &Bar) {
451 log::debug!("Processing {bar}");
452
453 if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
455 return;
456 }
457
458 let bar_type = bar.bar_type;
459 if bar_type.aggregation_source() == AggregationSource::Internal {
461 return;
462 }
463
464 let price_prec = self.instrument.price_precision();
465 let size_prec = self.instrument.size_precision();
466 let instrument_id = self.instrument.id();
467
468 assert!(
469 bar.open.precision == price_prec,
470 "Invalid bar open precision {}, expected {price_prec} for {instrument_id}",
471 bar.open.precision
472 );
473 assert!(
474 bar.high.precision == price_prec,
475 "Invalid bar high precision {}, expected {price_prec} for {instrument_id}",
476 bar.high.precision
477 );
478 assert!(
479 bar.low.precision == price_prec,
480 "Invalid bar low precision {}, expected {price_prec} for {instrument_id}",
481 bar.low.precision
482 );
483 assert!(
484 bar.close.precision == price_prec,
485 "Invalid bar close precision {}, expected {price_prec} for {instrument_id}",
486 bar.close.precision
487 );
488 assert!(
489 bar.volume.precision == size_prec,
490 "Invalid bar volume precision {}, expected {size_prec} for {instrument_id}",
491 bar.volume.precision
492 );
493
494 let execution_bar_type =
495 if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
496 execution_bar_type.to_owned()
497 } else {
498 self.execution_bar_types
499 .insert(bar.instrument_id(), bar_type);
500 self.execution_bar_deltas
501 .insert(bar_type, bar_type.spec().timedelta());
502 bar_type
503 };
504
505 if execution_bar_type != bar_type {
506 let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
507 if bar_type_timedelta.is_none() {
508 bar_type_timedelta = Some(bar_type.spec().timedelta());
509 self.execution_bar_deltas
510 .insert(bar_type, bar_type_timedelta.unwrap());
511 }
512 if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
513 >= &bar_type_timedelta.unwrap()
514 {
515 self.execution_bar_types
516 .insert(bar_type.instrument_id(), bar_type);
517 } else {
518 return;
519 }
520 }
521
522 match bar_type.spec().price_type {
523 PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
524 PriceType::Bid => {
525 self.last_bar_bid = Some(bar.to_owned());
526 self.process_quote_ticks_from_bar(bar);
527 }
528 PriceType::Ask => {
529 self.last_bar_ask = Some(bar.to_owned());
530 self.process_quote_ticks_from_bar(bar);
531 }
532 PriceType::Mark => panic!("Not implemented"),
533 }
534 }
535
536 fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
537 let quarter_raw = bar.volume.raw / 4;
539 let remainder_raw = bar.volume.raw % 4;
540 let size = Quantity::from_raw(quarter_raw, bar.volume.precision);
541 let close_size = Quantity::from_raw(quarter_raw + remainder_raw, bar.volume.precision);
542
543 let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
544 {
545 AggressorSide::Buyer
546 } else {
547 AggressorSide::Seller
548 };
549
550 let mut trade_tick = TradeTick::new(
552 bar.instrument_id(),
553 bar.open,
554 size,
555 aggressor_side,
556 self.ids_generator.generate_trade_id(),
557 bar.ts_init,
558 bar.ts_init,
559 );
560
561 if !self.core.is_last_initialized {
564 self.book.update_trade_tick(&trade_tick).unwrap();
565 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
566 self.core.set_last_raw(trade_tick.price);
567 }
568
569 if self.core.last.is_some_and(|last| bar.high > last) {
572 trade_tick.price = bar.high;
573 trade_tick.aggressor_side = AggressorSide::Buyer;
574 trade_tick.trade_id = self.ids_generator.generate_trade_id();
575
576 self.book.update_trade_tick(&trade_tick).unwrap();
577 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
578
579 self.core.set_last_raw(trade_tick.price);
580 }
581
582 if self.core.last.is_some_and(|last| bar.low < last) {
586 trade_tick.price = bar.low;
587 trade_tick.aggressor_side = AggressorSide::Seller;
588 trade_tick.trade_id = self.ids_generator.generate_trade_id();
589
590 self.book.update_trade_tick(&trade_tick).unwrap();
591 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
592
593 self.core.set_last_raw(trade_tick.price);
594 }
595
596 if self.core.last.is_some_and(|last| bar.close != last) {
601 trade_tick.price = bar.close;
602 trade_tick.size = close_size;
603 if bar.close > self.core.last.unwrap() {
604 trade_tick.aggressor_side = AggressorSide::Buyer;
605 } else {
606 trade_tick.aggressor_side = AggressorSide::Seller;
607 }
608 trade_tick.trade_id = self.ids_generator.generate_trade_id();
609
610 self.book.update_trade_tick(&trade_tick).unwrap();
611 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
612
613 self.core.set_last_raw(trade_tick.price);
614 }
615 }
616
617 fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
618 if self.last_bar_bid.is_none()
620 || self.last_bar_ask.is_none()
621 || self.last_bar_bid.unwrap().ts_init != self.last_bar_ask.unwrap().ts_init
622 {
623 return;
624 }
625 let bid_bar = self.last_bar_bid.unwrap();
626 let ask_bar = self.last_bar_ask.unwrap();
627
628 let bid_quarter = bid_bar.volume.raw / 4;
630 let bid_remainder = bid_bar.volume.raw % 4;
631 let ask_quarter = ask_bar.volume.raw / 4;
632 let ask_remainder = ask_bar.volume.raw % 4;
633
634 let bid_size = Quantity::from_raw(bid_quarter, bar.volume.precision);
635 let ask_size = Quantity::from_raw(ask_quarter, bar.volume.precision);
636 let bid_close_size = Quantity::from_raw(bid_quarter + bid_remainder, bar.volume.precision);
637 let ask_close_size = Quantity::from_raw(ask_quarter + ask_remainder, bar.volume.precision);
638
639 let mut quote_tick = QuoteTick::new(
641 self.book.instrument_id,
642 bid_bar.open,
643 ask_bar.open,
644 bid_size,
645 ask_size,
646 bid_bar.ts_init,
647 bid_bar.ts_init,
648 );
649
650 self.book.update_quote_tick("e_tick).unwrap();
652 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
653
654 quote_tick.bid_price = bid_bar.high;
656 quote_tick.ask_price = ask_bar.high;
657 self.book.update_quote_tick("e_tick).unwrap();
658 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
659
660 quote_tick.bid_price = bid_bar.low;
662 quote_tick.ask_price = ask_bar.low;
663 self.book.update_quote_tick("e_tick).unwrap();
664 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
665
666 quote_tick.bid_price = bid_bar.close;
668 quote_tick.ask_price = ask_bar.close;
669 quote_tick.bid_size = bid_close_size;
670 quote_tick.ask_size = ask_close_size;
671 self.book.update_quote_tick("e_tick).unwrap();
672 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
673
674 self.last_bar_bid = None;
676 self.last_bar_ask = None;
677 }
678
679 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
690 log::debug!("Processing {trade}");
691
692 let price_prec = self.instrument.price_precision();
693 let size_prec = self.instrument.size_precision();
694 let instrument_id = self.instrument.id();
695
696 assert!(
697 trade.price.precision == price_prec,
698 "Invalid trade price precision {}, expected {price_prec} for {instrument_id}",
699 trade.price.precision
700 );
701 assert!(
702 trade.size.precision == size_prec,
703 "Invalid trade size precision {}, expected {size_prec} for {instrument_id}",
704 trade.size.precision
705 );
706
707 if self.book_type == BookType::L1_MBP {
708 self.book.update_trade_tick(trade).unwrap();
709 }
710
711 let price_raw = trade.price.raw;
712 self.core.set_last_raw(trade.price);
713
714 let mut original_bid: Option<Price> = None;
715 let mut original_ask: Option<Price> = None;
716
717 let mut aggressor_side = AggressorSide::NoAggressor;
720
721 if self.config.trade_execution {
722 aggressor_side = trade.aggressor_side;
723
724 match aggressor_side {
725 AggressorSide::Buyer => {
726 if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
727 self.core.set_ask_raw(trade.price);
728 }
729 if self.core.bid.is_none()
730 || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
731 {
732 self.core.set_bid_raw(trade.price);
733 }
734 }
735 AggressorSide::Seller => {
736 if self.core.bid.is_none()
737 || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
738 {
739 self.core.set_bid_raw(trade.price);
740 }
741 if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
742 self.core.set_ask_raw(trade.price);
743 }
744 }
745 AggressorSide::NoAggressor => {
746 if self.core.bid.is_none()
747 || price_raw <= self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
748 {
749 self.core.set_bid_raw(trade.price);
750 }
751 if self.core.ask.is_none() || price_raw >= self.core.ask.map_or(0, |p| p.raw) {
752 self.core.set_ask_raw(trade.price);
753 }
754 }
755 }
756
757 original_bid = self.core.bid;
758 original_ask = self.core.ask;
759
760 match aggressor_side {
761 AggressorSide::Seller => {
762 if original_ask.is_some_and(|ask| price_raw < ask.raw) {
763 self.core.set_ask_raw(trade.price);
764 }
765 }
766 AggressorSide::Buyer => {
767 if original_bid.is_some_and(|bid| price_raw > bid.raw) {
768 self.core.set_bid_raw(trade.price);
769 }
770 }
771 AggressorSide::NoAggressor => {}
772 }
773
774 self.last_trade_size = Some(trade.size);
775 self.trade_consumption = 0;
776 }
777
778 self.iterate(trade.ts_init, aggressor_side);
779
780 if self.config.trade_execution {
781 self.last_trade_size = None;
782 self.trade_consumption = 0;
783
784 match aggressor_side {
785 AggressorSide::Seller => {
786 if let Some(ask) = original_ask
787 && price_raw < ask.raw
788 {
789 self.core.ask = Some(ask);
790 }
791 }
792 AggressorSide::Buyer => {
793 if let Some(bid) = original_bid
794 && price_raw > bid.raw
795 {
796 self.core.bid = Some(bid);
797 }
798 }
799 AggressorSide::NoAggressor => {}
800 }
801 }
802 }
803
804 pub fn process_status(&mut self, action: MarketStatusAction) {
806 log::debug!("Processing {action}");
807
808 if self.market_status == MarketStatus::Closed
810 && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
811 {
812 self.market_status = MarketStatus::Open;
813 }
814 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
816 self.market_status = MarketStatus::Paused;
817 }
818 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
820 self.market_status = MarketStatus::Suspended;
821 }
822 if self.market_status == MarketStatus::Open
824 && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
825 {
826 self.market_status = MarketStatus::Closed;
827 }
828 }
829
830 #[allow(clippy::needless_return)]
841 pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
842 {
844 let cache_borrow = self.cache.as_ref().borrow();
845
846 if self.core.order_exists(order.client_order_id()) {
847 self.generate_order_rejected(order, "Order already exists".into());
848 return;
849 }
850
851 self.account_ids.insert(order.trader_id(), account_id);
853
854 if self.instrument.has_expiration() {
856 if let Some(activation_ns) = self.instrument.activation_ns()
857 && self.clock.borrow().timestamp_ns() < activation_ns
858 {
859 self.generate_order_rejected(
860 order,
861 format!(
862 "Contract {} is not yet active, activation {}",
863 self.instrument.id(),
864 self.instrument.activation_ns().unwrap()
865 )
866 .into(),
867 );
868 return;
869 }
870 if let Some(expiration_ns) = self.instrument.expiration_ns()
871 && self.clock.borrow().timestamp_ns() >= expiration_ns
872 {
873 self.generate_order_rejected(
874 order,
875 format!(
876 "Contract {} has expired, expiration {}",
877 self.instrument.id(),
878 self.instrument.expiration_ns().unwrap()
879 )
880 .into(),
881 );
882 return;
883 }
884 }
885
886 if self.config.support_contingent_orders {
888 if let Some(parent_order_id) = order.parent_order_id() {
889 let parent_order = cache_borrow.order(&parent_order_id);
890 if parent_order.is_none()
891 || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
892 {
893 panic!("OTO parent not found");
894 }
895 if let Some(parent_order) = parent_order {
896 let parent_order_status = parent_order.status();
897 let order_is_open = order.is_open();
898 if parent_order.status() == OrderStatus::Rejected && order.is_open() {
899 self.generate_order_rejected(
900 order,
901 format!("Rejected OTO order from {parent_order_id}").into(),
902 );
903 return;
904 } else if parent_order.status() == OrderStatus::Accepted
905 && parent_order.status() == OrderStatus::Triggered
906 {
907 log::info!(
908 "Pending OTO order {} triggers from {parent_order_id}",
909 order.client_order_id(),
910 );
911 return;
912 }
913 }
914 }
915
916 if let Some(linked_order_ids) = order.linked_order_ids() {
917 for client_order_id in linked_order_ids {
918 match cache_borrow.order(client_order_id) {
919 Some(contingent_order)
920 if (order.contingency_type().unwrap() == ContingencyType::Oco
921 || order.contingency_type().unwrap()
922 == ContingencyType::Ouo)
923 && !order.is_closed()
924 && contingent_order.is_closed() =>
925 {
926 self.generate_order_rejected(
927 order,
928 format!("Contingent order {client_order_id} already closed")
929 .into(),
930 );
931 return;
932 }
933 None => panic!("Cannot find contingent order for {client_order_id}"),
934 _ => {}
935 }
936 }
937 }
938 }
939
940 if order.quantity().precision != self.instrument.size_precision() {
942 self.generate_order_rejected(
943 order,
944 format!(
945 "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
946 order.client_order_id(),
947 order.quantity().precision,
948 self.instrument.id(),
949 self.instrument.size_precision()
950 )
951 .into(),
952 );
953 return;
954 }
955
956 if let Some(price) = order.price()
958 && price.precision != self.instrument.price_precision()
959 {
960 self.generate_order_rejected(
961 order,
962 format!(
963 "Invalid order price precision for order {}, was {} when {} price precision is {}",
964 order.client_order_id(),
965 price.precision,
966 self.instrument.id(),
967 self.instrument.price_precision()
968 )
969 .into(),
970 );
971 return;
972 }
973
974 if let Some(trigger_price) = order.trigger_price()
976 && trigger_price.precision != self.instrument.price_precision()
977 {
978 self.generate_order_rejected(
979 order,
980 format!(
981 "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
982 order.client_order_id(),
983 trigger_price.precision,
984 self.instrument.id(),
985 self.instrument.price_precision()
986 )
987 .into(),
988 );
989 return;
990 }
991
992 let position: Option<&Position> = cache_borrow
994 .position_for_order(&order.client_order_id())
995 .or_else(|| {
996 if self.oms_type == OmsType::Netting {
997 let position_id = PositionId::new(
998 format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
999 );
1000 cache_borrow.position(&position_id)
1001 } else {
1002 None
1003 }
1004 });
1005
1006 if order.order_side() == OrderSide::Sell
1008 && self.account_type != AccountType::Margin
1009 && matches!(self.instrument, InstrumentAny::Equity(_))
1010 && (position.is_none()
1011 || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
1012 {
1013 let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
1014 self.generate_order_rejected(
1015 order,
1016 format!(
1017 "Short selling not permitted on a CASH account with position {position_string} and order {order}",
1018 )
1019 .into(),
1020 );
1021 return;
1022 }
1023
1024 if self.config.use_reduce_only
1026 && order.is_reduce_only()
1027 && !order.is_closed()
1028 && position.is_none_or(|pos| {
1029 pos.is_closed()
1030 || (order.is_buy() && pos.is_long())
1031 || (order.is_sell() && pos.is_short())
1032 })
1033 {
1034 self.generate_order_rejected(
1035 order,
1036 format!(
1037 "Reduce-only order {} ({}-{}) would have increased position",
1038 order.client_order_id(),
1039 order.order_type().to_string().to_uppercase(),
1040 order.order_side().to_string().to_uppercase()
1041 )
1042 .into(),
1043 );
1044 return;
1045 }
1046 }
1047
1048 match order.order_type() {
1049 OrderType::Market if self.config.price_protection_points.is_some() => {
1050 self.process_market_order_with_protection(order);
1051 }
1052 OrderType::Market => self.process_market_order(order),
1053 OrderType::Limit => self.process_limit_order(order),
1054 OrderType::MarketToLimit => self.process_market_to_limit_order(order),
1055 OrderType::StopMarket if self.config.price_protection_points.is_some() => {
1056 self.process_stop_market_order_with_protection(order);
1057 }
1058 OrderType::StopMarket => self.process_stop_market_order(order),
1059 OrderType::StopLimit => self.process_stop_limit_order(order),
1060 OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
1061 OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
1062 OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
1063 OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
1064 }
1065 }
1066
1067 pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
1073 let orig_order = match self.core.get_order(command.client_order_id) {
1075 Some(order) => order.clone(),
1076 None => {
1077 self.generate_order_modify_rejected(
1078 command.trader_id,
1079 command.strategy_id,
1080 command.instrument_id,
1081 command.client_order_id,
1082 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
1083 command.venue_order_id,
1084 Some(account_id),
1085 );
1086 return;
1087 }
1088 };
1089
1090 let mut order = orig_order.to_any();
1091 let update_success = self.update_order(
1092 &mut order,
1093 command.quantity,
1094 command.price,
1095 command.trigger_price,
1096 None,
1097 );
1098
1099 if update_success && order.is_open() {
1101 let _ = self.core.delete_order(&orig_order);
1102 let _ = self
1103 .core
1104 .add_order(PassiveOrderAny::try_from(order).expect("passive order conversion"));
1105 }
1106 }
1107
1108 pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
1110 match self.core.get_order(command.client_order_id) {
1111 Some(passive_order) => {
1112 if passive_order.is_inflight() || passive_order.is_open() {
1113 self.cancel_order(&OrderAny::from(passive_order.to_owned()), None);
1114 }
1115 }
1116 None => self.generate_order_cancel_rejected(
1117 command.trader_id,
1118 command.strategy_id,
1119 account_id,
1120 command.instrument_id,
1121 command.client_order_id,
1122 command.venue_order_id,
1123 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
1124 ),
1125 }
1126 }
1127
1128 pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
1130 let instrument_id = command.instrument_id;
1131 let open_orders = self
1132 .cache
1133 .borrow()
1134 .orders_open(None, Some(&instrument_id), None, None)
1135 .into_iter()
1136 .cloned()
1137 .collect::<Vec<OrderAny>>();
1138 for order in open_orders {
1139 if command.order_side != OrderSide::NoOrderSide
1140 && command.order_side != order.order_side()
1141 {
1142 continue;
1143 }
1144 if order.is_inflight() || order.is_open() {
1145 self.cancel_order(&order, None);
1146 }
1147 }
1148 }
1149
1150 pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
1152 for order in &command.cancels {
1153 self.process_cancel(order, account_id);
1154 }
1155 }
1156
1157 fn process_market_order(&mut self, order: &mut OrderAny) {
1158 if order.time_in_force() == TimeInForce::AtTheOpen
1159 || order.time_in_force() == TimeInForce::AtTheClose
1160 {
1161 log::error!(
1162 "Market auction for the time in force {} is currently not supported",
1163 order.time_in_force()
1164 );
1165 return;
1166 }
1167
1168 let order_side = order.order_side();
1170 let is_ask_initialized = self.core.is_ask_initialized;
1171 let is_bid_initialized = self.core.is_bid_initialized;
1172 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
1173 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
1174 {
1175 self.generate_order_rejected(
1176 order,
1177 format!("No market for {}", order.instrument_id()).into(),
1178 );
1179 return;
1180 }
1181
1182 self.fill_market_order(order);
1183 }
1184
1185 fn process_market_order_with_protection(&mut self, order: &mut OrderAny) {
1186 if order.time_in_force() == TimeInForce::AtTheOpen
1187 || order.time_in_force() == TimeInForce::AtTheClose
1188 {
1189 log::error!(
1190 "Market auction for the time in force {} is currently not supported",
1191 order.time_in_force()
1192 );
1193 return;
1194 }
1195
1196 let order_side = order.order_side();
1198 let is_ask_initialized = self.core.is_ask_initialized;
1199 let is_bid_initialized = self.core.is_bid_initialized;
1200 if (order_side == OrderSide::Buy && !self.core.is_ask_initialized)
1201 || (order_side == OrderSide::Sell && !self.core.is_bid_initialized)
1202 {
1203 self.generate_order_rejected(
1204 order,
1205 format!("No market for {}", order.instrument_id()).into(),
1206 );
1207 return;
1208 }
1209
1210 self.update_protection_price(order);
1211
1212 let protection_price = order
1213 .price()
1214 .expect("Market order with protection must have a protection price");
1215
1216 self.accept_order(order);
1218
1219 if self
1221 .core
1222 .is_limit_matched(order.order_side_specified(), protection_price)
1223 {
1224 if order.liquidity_side().is_some()
1226 && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
1227 {
1228 order.set_liquidity_side(LiquiditySide::Taker);
1229 }
1230 self.fill_limit_order(order);
1231 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
1232 self.cancel_order(order, None);
1233 }
1234 }
1235
1236 fn process_limit_order(&mut self, order: &mut OrderAny) {
1237 let limit_px = order.price().expect("Limit order must have a price");
1238 if order.is_post_only()
1239 && self
1240 .core
1241 .is_limit_matched(order.order_side_specified(), limit_px)
1242 {
1243 self.generate_order_rejected(
1244 order,
1245 format!(
1246 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
1247 order.order_type(),
1248 order.order_side(),
1249 order.price().unwrap(),
1250 self.core
1251 .bid
1252 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1253 self.core
1254 .ask
1255 .map_or_else(|| "None".to_string(), |p| p.to_string())
1256 )
1257 .into(),
1258 );
1259 return;
1260 }
1261
1262 self.accept_order(order);
1264
1265 if self
1267 .core
1268 .is_limit_matched(order.order_side_specified(), limit_px)
1269 {
1270 if order.liquidity_side().is_some()
1272 && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
1273 {
1274 order.set_liquidity_side(LiquiditySide::Taker);
1275 }
1276 self.fill_limit_order(order);
1277 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
1278 self.cancel_order(order, None);
1279 }
1280 }
1281
1282 fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
1283 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
1285 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
1286 {
1287 self.generate_order_rejected(
1288 order,
1289 format!("No market for {}", order.instrument_id()).into(),
1290 );
1291 return;
1292 }
1293
1294 self.fill_market_order(order);
1296
1297 if order.is_open() {
1298 self.accept_order(order);
1299 }
1300 }
1301
1302 fn process_stop_market_order(&mut self, order: &mut OrderAny) {
1303 let stop_px = order
1304 .trigger_price()
1305 .expect("Stop order must have a trigger price");
1306 if self
1307 .core
1308 .is_stop_matched(order.order_side_specified(), stop_px)
1309 {
1310 if self.config.reject_stop_orders {
1311 self.generate_order_rejected(
1312 order,
1313 format!(
1314 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1315 order.order_type(),
1316 order.order_side(),
1317 order.trigger_price().unwrap(),
1318 self.core
1319 .bid
1320 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1321 self.core
1322 .ask
1323 .map_or_else(|| "None".to_string(), |p| p.to_string())
1324 ).into(),
1325 );
1326 return;
1327 }
1328 self.fill_market_order(order);
1329 return;
1330 }
1331
1332 self.accept_order(order);
1334 }
1335
1336 fn process_stop_market_order_with_protection(&mut self, order: &mut OrderAny) {
1337 let stop_px = order
1338 .trigger_price()
1339 .expect("Stop order must have a trigger price");
1340
1341 let order_side = order.order_side();
1342 let is_ask_initialized = self.core.is_ask_initialized;
1343 let is_bid_initialized = self.core.is_bid_initialized;
1344 if (order_side == OrderSide::Buy && !self.core.is_ask_initialized)
1345 || (order_side == OrderSide::Sell && !self.core.is_bid_initialized)
1346 {
1347 self.generate_order_rejected(
1348 order,
1349 format!("No market for {}", order.instrument_id()).into(),
1350 );
1351 return;
1352 }
1353
1354 self.update_protection_price(order);
1355 let protection_price = order
1356 .price()
1357 .expect("Market order with protection must have a protection price");
1358
1359 if self
1360 .core
1361 .is_stop_matched(order.order_side_specified(), stop_px)
1362 {
1363 if self.config.reject_stop_orders {
1364 self.generate_order_rejected(
1365 order,
1366 format!(
1367 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1368 order.order_type(),
1369 order.order_side(),
1370 order.trigger_price().unwrap(),
1371 self.core
1372 .bid
1373 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1374 self.core
1375 .ask
1376 .map_or_else(|| "None".to_string(), |p| p.to_string())
1377 ).into(),
1378 );
1379 return;
1380 } else {
1381 self.accept_order(order);
1383 }
1384
1385 if self
1386 .core
1387 .is_limit_matched(order.order_side_specified(), protection_price)
1388 {
1389 self.fill_limit_order(order);
1390 }
1391 return;
1392 }
1393 self.accept_order(order);
1395 }
1396
1397 fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
1398 let stop_px = order
1399 .trigger_price()
1400 .expect("Stop order must have a trigger price");
1401 if self
1402 .core
1403 .is_stop_matched(order.order_side_specified(), stop_px)
1404 {
1405 if self.config.reject_stop_orders {
1406 self.generate_order_rejected(
1407 order,
1408 format!(
1409 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1410 order.order_type(),
1411 order.order_side(),
1412 order.trigger_price().unwrap(),
1413 self.core
1414 .bid
1415 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1416 self.core
1417 .ask
1418 .map_or_else(|| "None".to_string(), |p| p.to_string())
1419 ).into(),
1420 );
1421 return;
1422 }
1423
1424 self.accept_order(order);
1425 self.generate_order_triggered(order);
1426
1427 let limit_px = order.price().expect("Stop limit order must have a price");
1429 if self
1430 .core
1431 .is_limit_matched(order.order_side_specified(), limit_px)
1432 {
1433 order.set_liquidity_side(LiquiditySide::Taker);
1434 self.fill_limit_order(order);
1435 }
1436
1437 return;
1439 }
1440
1441 self.accept_order(order);
1442 }
1443
1444 fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
1445 if self
1446 .core
1447 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1448 {
1449 if self.config.reject_stop_orders {
1450 self.generate_order_rejected(
1451 order,
1452 format!(
1453 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1454 order.order_type(),
1455 order.order_side(),
1456 order.trigger_price().unwrap(),
1457 self.core
1458 .bid
1459 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1460 self.core
1461 .ask
1462 .map_or_else(|| "None".to_string(), |p| p.to_string())
1463 ).into(),
1464 );
1465 return;
1466 }
1467 self.fill_market_order(order);
1468 return;
1469 }
1470
1471 self.accept_order(order);
1473 }
1474
1475 fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
1476 if self
1477 .core
1478 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1479 {
1480 if self.config.reject_stop_orders {
1481 self.generate_order_rejected(
1482 order,
1483 format!(
1484 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1485 order.order_type(),
1486 order.order_side(),
1487 order.trigger_price().unwrap(),
1488 self.core
1489 .bid
1490 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1491 self.core
1492 .ask
1493 .map_or_else(|| "None".to_string(), |p| p.to_string())
1494 ).into(),
1495 );
1496 return;
1497 }
1498 self.accept_order(order);
1499 self.generate_order_triggered(order);
1500
1501 if self
1503 .core
1504 .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1505 {
1506 order.set_liquidity_side(LiquiditySide::Taker);
1507 self.fill_limit_order(order);
1508 }
1509 return;
1510 }
1511
1512 self.accept_order(order);
1514 }
1515
1516 fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1517 if let Some(trigger_price) = order.trigger_price()
1518 && self
1519 .core
1520 .is_stop_matched(order.order_side_specified(), trigger_price)
1521 {
1522 self.generate_order_rejected(
1523 order,
1524 format!(
1525 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1526 order.order_type(),
1527 order.order_side(),
1528 trigger_price,
1529 self.core
1530 .bid
1531 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1532 self.core
1533 .ask
1534 .map_or_else(|| "None".to_string(), |p| p.to_string())
1535 ).into(),
1536 );
1537 return;
1538 }
1539
1540 self.accept_order(order);
1542 }
1543
1544 pub fn iterate(&mut self, timestamp_ns: UnixNanos, aggressor_side: AggressorSide) {
1557 if aggressor_side == AggressorSide::NoAggressor {
1562 if self.book.has_bid() {
1563 self.core.set_bid_raw(self.book.best_bid_price().unwrap());
1564 }
1565 if self.book.has_ask() {
1566 self.core.set_ask_raw(self.book.best_ask_price().unwrap());
1567 }
1568 }
1569
1570 self.core.iterate();
1571
1572 let orders_bid = self.core.get_orders_bid().to_vec();
1573 let orders_ask = self.core.get_orders_ask().to_vec();
1574
1575 self.iterate_orders(timestamp_ns, &orders_bid);
1576 self.iterate_orders(timestamp_ns, &orders_ask);
1577
1578 self.core.bid = self.book.best_bid_price();
1581 self.core.ask = self.book.best_ask_price();
1582 }
1583
1584 fn maybe_activate_trailing_stop(
1585 &mut self,
1586 order: &mut OrderAny,
1587 bid: Option<Price>,
1588 ask: Option<Price>,
1589 ) -> bool {
1590 match order {
1591 OrderAny::TrailingStopMarket(inner) => {
1592 if inner.is_activated {
1593 return true;
1594 }
1595
1596 if inner.activation_price.is_none() {
1597 let px = match inner.order_side() {
1598 OrderSide::Buy => ask,
1599 OrderSide::Sell => bid,
1600 _ => None,
1601 };
1602 if let Some(p) = px {
1603 inner.activation_price = Some(p);
1604 inner.set_activated();
1605 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1606 log::error!("Failed to update order: {e}");
1607 }
1608 return true;
1609 }
1610 return false;
1611 }
1612
1613 let activation_price = inner.activation_price.unwrap();
1614 let hit = match inner.order_side() {
1615 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
1616 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
1617 _ => false,
1618 };
1619 if hit {
1620 inner.set_activated();
1621 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1622 log::error!("Failed to update order: {e}");
1623 }
1624 }
1625 hit
1626 }
1627 OrderAny::TrailingStopLimit(inner) => {
1628 if inner.is_activated {
1629 return true;
1630 }
1631
1632 if inner.activation_price.is_none() {
1633 let px = match inner.order_side() {
1634 OrderSide::Buy => ask,
1635 OrderSide::Sell => bid,
1636 _ => None,
1637 };
1638 if let Some(p) = px {
1639 inner.activation_price = Some(p);
1640 inner.set_activated();
1641 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1642 log::error!("Failed to update order: {e}");
1643 }
1644 return true;
1645 }
1646 return false;
1647 }
1648
1649 let activation_price = inner.activation_price.unwrap();
1650 let hit = match inner.order_side() {
1651 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
1652 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
1653 _ => false,
1654 };
1655 if hit {
1656 inner.set_activated();
1657 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1658 log::error!("Failed to update order: {e}");
1659 }
1660 }
1661 hit
1662 }
1663 _ => true,
1664 }
1665 }
1666
1667 fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[PassiveOrderAny]) {
1668 for order in orders {
1669 if order.is_closed() {
1670 continue;
1671 }
1672
1673 if self.config.support_gtd_orders
1674 && order
1675 .expire_time()
1676 .is_some_and(|expire_timestamp_ns| timestamp_ns >= expire_timestamp_ns)
1677 {
1678 let _ = self.core.delete_order(order);
1679 self.cached_filled_qty.remove(&order.client_order_id());
1680 self.expire_order(order);
1681 continue;
1682 }
1683
1684 if matches!(
1685 order,
1686 PassiveOrderAny::Stop(
1687 StopOrderAny::TrailingStopMarket(_) | StopOrderAny::TrailingStopLimit(_)
1688 )
1689 ) {
1690 let mut any = OrderAny::from(order.clone());
1691
1692 if !self.maybe_activate_trailing_stop(&mut any, self.core.bid, self.core.ask) {
1693 continue;
1694 }
1695
1696 self.update_trailing_stop_order(&mut any);
1697
1698 let _ = self.core.delete_order(order);
1700 let _ = self
1701 .core
1702 .add_order(PassiveOrderAny::try_from(any).expect("passive order conversion"));
1703 }
1704
1705 if let Some(target_bid) = self.target_bid {
1707 self.core.bid = Some(target_bid);
1708 self.target_bid = None;
1709 }
1710 if let Some(target_bid) = self.target_bid.take() {
1711 self.core.bid = Some(target_bid);
1712 self.target_bid = None;
1713 }
1714 if let Some(target_ask) = self.target_ask.take() {
1715 self.core.ask = Some(target_ask);
1716 self.target_ask = None;
1717 }
1718 if let Some(target_last) = self.target_last.take() {
1719 self.core.last = Some(target_last);
1720 self.target_last = None;
1721 }
1722 }
1723
1724 self.target_bid = None;
1726 self.target_ask = None;
1727 self.target_last = None;
1728 }
1729
1730 fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1731 match order.price() {
1732 Some(order_price) => {
1733 let mut fills = if self.config.liquidity_consumption {
1738 let size_prec = self.instrument.size_precision();
1739 self.book
1740 .get_all_crossed_levels(order.order_side(), order_price, size_prec)
1741 } else {
1742 let book_order =
1743 BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
1744 self.book.simulate_fills(&book_order)
1745 };
1746
1747 if let Some(trade_size) = self.last_trade_size
1749 && let Some(trade_price) = self.core.last
1750 {
1751 let fills_at_trade_price = fills.iter().any(|(px, _)| *px == trade_price);
1752
1753 if !fills_at_trade_price
1754 && self
1755 .core
1756 .is_limit_matched(order.order_side_specified(), order_price)
1757 {
1758 let leaves_qty = order.leaves_qty();
1759
1760 let available_qty = if self.config.liquidity_consumption {
1762 let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
1763 Quantity::from_raw(remaining, trade_size.precision)
1764 } else {
1765 trade_size
1766 };
1767
1768 let fill_qty = min(leaves_qty, available_qty);
1769
1770 if !fill_qty.is_zero() {
1771 log::debug!(
1772 "Trade execution fill: {} @ {} (available: {}, book had {} fills)",
1773 fill_qty,
1774 trade_price,
1775 available_qty,
1776 fills.len()
1777 );
1778
1779 fills = vec![(trade_price, fill_qty)];
1780
1781 if self.config.liquidity_consumption {
1782 self.trade_consumption += fill_qty.raw;
1783 }
1784 }
1785 }
1786 }
1787
1788 if fills.is_empty() {
1790 return fills;
1791 }
1792
1793 if let Some(triggered_price) = order.trigger_price() {
1795 if order
1797 .liquidity_side()
1798 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
1799 {
1800 if order.order_side() == OrderSide::Sell && order_price > triggered_price {
1801 let first_fill = fills.first().unwrap();
1803 let triggered_qty = first_fill.1;
1804 fills[0] = (triggered_price, triggered_qty);
1805 self.target_bid = self.core.bid;
1806 self.target_ask = self.core.ask;
1807 self.target_last = self.core.last;
1808 self.core.set_ask_raw(order_price);
1809 self.core.set_last_raw(order_price);
1810 } else if order.order_side() == OrderSide::Buy
1811 && order_price < triggered_price
1812 {
1813 let first_fill = fills.first().unwrap();
1815 let triggered_qty = first_fill.1;
1816 fills[0] = (triggered_price, triggered_qty);
1817 self.target_bid = self.core.bid;
1818 self.target_ask = self.core.ask;
1819 self.target_last = self.core.last;
1820 self.core.set_bid_raw(order_price);
1821 self.core.set_last_raw(order_price);
1822 }
1823 }
1824 }
1825
1826 if order
1828 .liquidity_side()
1829 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1830 {
1831 match order.order_side().as_specified() {
1832 OrderSideSpecified::Buy => {
1833 let target_price = if order
1834 .trigger_price()
1835 .is_some_and(|trigger_price| order_price > trigger_price)
1836 {
1837 order.trigger_price().unwrap()
1838 } else {
1839 order_price
1840 };
1841 for fill in &fills {
1842 let last_px = fill.0;
1843 if last_px < order_price {
1844 self.target_bid = self.core.bid;
1846 self.target_ask = self.core.ask;
1847 self.target_last = self.core.last;
1848 self.core.set_ask_raw(target_price);
1849 self.core.set_last_raw(target_price);
1850 }
1851 }
1852 }
1853 OrderSideSpecified::Sell => {
1854 let target_price = if order
1855 .trigger_price()
1856 .is_some_and(|trigger_price| order_price < trigger_price)
1857 {
1858 order.trigger_price().unwrap()
1859 } else {
1860 order_price
1861 };
1862 for fill in &fills {
1863 let last_px = fill.0;
1864 if last_px > order_price {
1865 self.target_bid = self.core.bid;
1867 self.target_ask = self.core.ask;
1868 self.target_last = self.core.last;
1869 self.core.set_bid_raw(target_price);
1870 self.core.set_last_raw(target_price);
1871 }
1872 }
1873 }
1874 }
1875 }
1876
1877 self.apply_liquidity_consumption(fills, order.order_side(), order.leaves_qty())
1878 }
1879 None => panic!("Limit order must have a price"),
1880 }
1881 }
1882
1883 fn determine_market_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1884 let price = match order.order_side().as_specified() {
1885 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
1886 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
1887 };
1888
1889 let fills = if self.config.liquidity_consumption {
1892 let size_prec = self.instrument.size_precision();
1893 self.book
1894 .get_all_crossed_levels(order.order_side(), price, size_prec)
1895 } else {
1896 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
1897 self.book.simulate_fills(&book_order)
1898 };
1899
1900 self.apply_liquidity_consumption(fills, order.order_side(), order.leaves_qty())
1901 }
1902
1903 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
1908 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
1909 && filled_qty >= &order.quantity()
1910 {
1911 log::info!(
1912 "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
1913 filled_qty,
1914 order.quantity(),
1915 order.filled_qty(),
1916 order.quantity()
1917 );
1918 return;
1919 }
1920
1921 let venue_position_id = self.ids_generator.get_position_id(order, Some(true));
1922 let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
1923 let cache = self.cache.as_ref().borrow();
1924 cache.position(&venue_position_id).cloned()
1925 } else {
1926 None
1927 };
1928
1929 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1930 log::warn!(
1931 "Canceling REDUCE_ONLY {} as would increase position",
1932 order.order_type()
1933 );
1934 self.cancel_order(order, None);
1935 return;
1936 }
1937 order.set_liquidity_side(LiquiditySide::Taker);
1939 let fills = self.determine_market_price_and_volume(order);
1940 self.apply_fills(order, fills, LiquiditySide::Taker, None, position);
1941 }
1942
1943 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
1952 match order.price() {
1953 Some(order_price) => {
1954 let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
1955 if let Some(&qty) = cached_filled_qty
1956 && qty >= order.quantity()
1957 {
1958 log::debug!(
1959 "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
1960 qty,
1961 order.quantity(),
1962 order.filled_qty(),
1963 order.leaves_qty(),
1964 );
1965 return;
1966 }
1967
1968 if order
1969 .liquidity_side()
1970 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1971 {
1972 if order.order_side() == OrderSide::Buy
1973 && self.core.bid.is_some_and(|bid| bid == order_price)
1974 && !self.fill_model.is_limit_filled()
1975 {
1976 return;
1978 }
1979 if order.order_side() == OrderSide::Sell
1980 && self.core.ask.is_some_and(|ask| ask == order_price)
1981 && !self.fill_model.is_limit_filled()
1982 {
1983 return;
1985 }
1986 }
1987
1988 let venue_position_id = self.ids_generator.get_position_id(order, None);
1989 let position = if let Some(venue_position_id) = venue_position_id {
1990 let cache = self.cache.as_ref().borrow();
1991 cache.position(&venue_position_id).cloned()
1992 } else {
1993 None
1994 };
1995
1996 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1997 log::warn!(
1998 "Canceling REDUCE_ONLY {} as would increase position",
1999 order.order_type()
2000 );
2001 self.cancel_order(order, None);
2002 return;
2003 }
2004
2005 let fills = self.determine_limit_price_and_volume(order);
2006
2007 if fills.is_empty() {
2011 return;
2012 }
2013
2014 self.apply_fills(
2015 order,
2016 fills,
2017 order.liquidity_side().unwrap(),
2018 venue_position_id,
2019 position,
2020 );
2021 }
2022 None => panic!("Limit order must have a price"),
2023 }
2024 }
2025
2026 fn apply_fills(
2027 &mut self,
2028 order: &mut OrderAny,
2029 fills: Vec<(Price, Quantity)>,
2030 liquidity_side: LiquiditySide,
2031 venue_position_id: Option<PositionId>,
2032 position: Option<Position>,
2033 ) {
2034 if order.time_in_force() == TimeInForce::Fok {
2035 let mut total_size = Quantity::zero(order.quantity().precision);
2036 for (fill_px, fill_qty) in &fills {
2037 total_size = total_size.add(*fill_qty);
2038 }
2039
2040 if order.leaves_qty() > total_size {
2041 self.cancel_order(order, None);
2042 return;
2043 }
2044 }
2045
2046 if fills.is_empty() {
2047 if order.status() == OrderStatus::Submitted {
2048 self.generate_order_rejected(
2049 order,
2050 format!("No market for {}", order.instrument_id()).into(),
2051 );
2052 } else {
2053 log::error!(
2054 "Cannot fill order: no fills from book when fills were expected (check size in data)"
2055 );
2056 return;
2057 }
2058 }
2059
2060 let venue_position_id = if self.oms_type == OmsType::Netting {
2062 None
2063 } else {
2064 venue_position_id
2065 };
2066
2067 let mut initial_market_to_limit_fill = false;
2068
2069 for &(mut fill_px, ref fill_qty) in &fills {
2070 assert!(
2071 (fill_px.precision == self.instrument.price_precision()),
2072 "Invalid price precision for fill price {} when instrument price precision is {}.\
2073 Check that the data price precision matches the {} instrument",
2074 fill_px.precision,
2075 self.instrument.price_precision(),
2076 self.instrument.id()
2077 );
2078
2079 assert!(
2080 (fill_qty.precision == self.instrument.size_precision()),
2081 "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
2082 Check that the data quantity precision matches the {} instrument",
2083 fill_qty.precision,
2084 self.instrument.size_precision(),
2085 self.instrument.id()
2086 );
2087
2088 if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
2089 && order.order_type() == OrderType::MarketToLimit
2090 {
2091 self.generate_order_updated(order, order.quantity(), Some(fill_px), None, None);
2092 initial_market_to_limit_fill = true;
2093 }
2094
2095 if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
2096 fill_px = match order.order_side().as_specified() {
2097 OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
2098 OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
2099 }
2100 }
2101
2102 let mut effective_fill_qty = *fill_qty;
2106
2107 if self.config.use_reduce_only
2108 && order.is_reduce_only()
2109 && let Some(position) = &position
2110 && *fill_qty > position.quantity
2111 {
2112 if position.quantity == Quantity::zero(position.quantity.precision) {
2113 return;
2115 }
2116
2117 let adjusted_fill_qty =
2119 Quantity::from_raw(position.quantity.raw, fill_qty.precision);
2120
2121 effective_fill_qty = min(effective_fill_qty, adjusted_fill_qty);
2123
2124 if order.quantity() != adjusted_fill_qty {
2126 self.generate_order_updated(order, adjusted_fill_qty, None, None, None);
2127 }
2128 }
2129
2130 if fill_qty.is_zero() {
2131 if fills.len() == 1 && order.status() == OrderStatus::Submitted {
2132 self.generate_order_rejected(
2133 order,
2134 format!("No market for {}", order.instrument_id()).into(),
2135 );
2136 }
2137 return;
2138 }
2139
2140 self.fill_order(
2141 order,
2142 fill_px,
2143 effective_fill_qty,
2144 liquidity_side,
2145 venue_position_id,
2146 position.clone(),
2147 );
2148
2149 if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
2150 return;
2152 }
2153 }
2154
2155 if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
2156 self.cancel_order(order, None);
2158 return;
2159 }
2160
2161 if order.is_open()
2162 && self.book_type == BookType::L1_MBP
2163 && matches!(
2164 order.order_type(),
2165 OrderType::Market
2166 | OrderType::MarketIfTouched
2167 | OrderType::StopMarket
2168 | OrderType::TrailingStopMarket
2169 )
2170 {
2171 todo!("Exhausted simulated book volume")
2175 }
2176 }
2177
2178 fn fill_order(
2179 &mut self,
2180 order: &mut OrderAny,
2181 last_px: Price,
2182 last_qty: Quantity,
2183 liquidity_side: LiquiditySide,
2184 venue_position_id: Option<PositionId>,
2185 position: Option<Position>,
2186 ) {
2187 let size_prec = self.instrument.size_precision();
2188 let instrument_id = self.instrument.id();
2189 assert!(
2190 last_qty.precision == size_prec,
2191 "Invalid fill quantity precision {}, expected {size_prec} for {instrument_id}",
2192 last_qty.precision
2193 );
2194
2195 match self.cached_filled_qty.get(&order.client_order_id()) {
2196 Some(filled_qty) => {
2197 let leaves_qty = order.quantity().saturating_sub(*filled_qty);
2199 let last_qty = min(last_qty, leaves_qty);
2200 let new_filled_qty = *filled_qty + last_qty;
2201 self.cached_filled_qty
2203 .insert(order.client_order_id(), new_filled_qty);
2204 }
2205 None => {
2206 self.cached_filled_qty
2207 .insert(order.client_order_id(), last_qty);
2208 }
2209 }
2210
2211 let commission = self
2213 .fee_model
2214 .get_commission(order, last_qty, last_px, &self.instrument)
2215 .unwrap();
2216
2217 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2218 self.generate_order_filled(
2219 order,
2220 venue_order_id,
2221 venue_position_id,
2222 last_qty,
2223 last_px,
2224 self.instrument.quote_currency(),
2225 commission,
2226 liquidity_side,
2227 );
2228
2229 if order.is_passive() && order.is_closed() {
2230 if self.core.order_exists(order.client_order_id()) {
2232 let _ = self.core.delete_order(
2233 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
2234 );
2235 }
2236 self.cached_filled_qty.remove(&order.client_order_id());
2237 }
2238
2239 if !self.config.support_contingent_orders {
2240 return;
2241 }
2242
2243 if let Some(contingency_type) = order.contingency_type() {
2244 match contingency_type {
2245 ContingencyType::Oto => {
2246 if let Some(linked_orders_ids) = order.linked_order_ids() {
2247 for client_order_id in linked_orders_ids {
2248 let mut child_order = match self.cache.borrow().order(client_order_id) {
2249 Some(child_order) => child_order.clone(),
2250 None => panic!("Order {client_order_id} not found in cache"),
2251 };
2252
2253 if child_order.is_closed() || child_order.is_active_local() {
2254 continue;
2255 }
2256
2257 if let (None, Some(position_id)) =
2259 (child_order.position_id(), order.position_id())
2260 {
2261 self.cache
2262 .borrow_mut()
2263 .add_position_id(
2264 &position_id,
2265 &self.venue,
2266 client_order_id,
2267 &child_order.strategy_id(),
2268 )
2269 .unwrap();
2270 log::debug!(
2271 "Added position id {position_id} to cache for order {client_order_id}"
2272 );
2273 }
2274
2275 if (!child_order.is_open())
2276 || (matches!(child_order.status(), OrderStatus::PendingUpdate)
2277 && child_order
2278 .previous_status()
2279 .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
2280 {
2281 let account_id = order.account_id().unwrap_or_else(|| {
2282 *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
2283 panic!(
2284 "Account ID not found for trader {}",
2285 order.trader_id()
2286 )
2287 })
2288 });
2289 self.process_order(&mut child_order, account_id);
2290 }
2291 }
2292 } else {
2293 log::error!(
2294 "OTO order {} does not have linked orders",
2295 order.client_order_id()
2296 );
2297 }
2298 }
2299 ContingencyType::Oco => {
2300 if let Some(linked_orders_ids) = order.linked_order_ids() {
2301 for client_order_id in linked_orders_ids {
2302 let child_order = match self.cache.borrow().order(client_order_id) {
2303 Some(child_order) => child_order.clone(),
2304 None => panic!("Order {client_order_id} not found in cache"),
2305 };
2306
2307 if child_order.is_closed() || child_order.is_active_local() {
2308 continue;
2309 }
2310
2311 self.cancel_order(&child_order, None);
2312 }
2313 } else {
2314 log::error!(
2315 "OCO order {} does not have linked orders",
2316 order.client_order_id()
2317 );
2318 }
2319 }
2320 ContingencyType::Ouo => {
2321 if let Some(linked_orders_ids) = order.linked_order_ids() {
2322 for client_order_id in linked_orders_ids {
2323 let mut child_order = match self.cache.borrow().order(client_order_id) {
2324 Some(child_order) => child_order.clone(),
2325 None => panic!("Order {client_order_id} not found in cache"),
2326 };
2327
2328 if child_order.is_active_local() {
2329 continue;
2330 }
2331
2332 if order.is_closed() && child_order.is_open() {
2333 self.cancel_order(&child_order, None);
2334 } else if !order.leaves_qty().is_zero()
2335 && order.leaves_qty() != child_order.leaves_qty()
2336 {
2337 let price = child_order.price();
2338 let trigger_price = child_order.trigger_price();
2339 self.update_order(
2340 &mut child_order,
2341 Some(order.leaves_qty()),
2342 price,
2343 trigger_price,
2344 Some(false),
2345 );
2346 }
2347 }
2348 } else {
2349 log::error!(
2350 "OUO order {} does not have linked orders",
2351 order.client_order_id()
2352 );
2353 }
2354 }
2355 _ => {}
2356 }
2357 }
2358 }
2359
2360 fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
2361 if self
2362 .core
2363 .is_limit_matched(order.order_side_specified(), price)
2364 {
2365 if order.is_post_only() {
2366 self.generate_order_modify_rejected(
2367 order.trader_id(),
2368 order.strategy_id(),
2369 order.instrument_id(),
2370 order.client_order_id(),
2371 Ustr::from(format!(
2372 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
2373 order.order_type(),
2374 order.order_side(),
2375 price,
2376 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
2377 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
2378 ).as_str()),
2379 order.venue_order_id(),
2380 order.account_id(),
2381 );
2382 return;
2383 }
2384
2385 self.generate_order_updated(order, quantity, Some(price), None, None);
2386 order.set_liquidity_side(LiquiditySide::Taker);
2387 self.fill_limit_order(order);
2388 return;
2389 }
2390 self.generate_order_updated(order, quantity, Some(price), None, None);
2391 }
2392
2393 fn update_stop_market_order(
2394 &mut self,
2395 order: &mut OrderAny,
2396 quantity: Quantity,
2397 trigger_price: Price,
2398 ) {
2399 if self
2400 .core
2401 .is_stop_matched(order.order_side_specified(), trigger_price)
2402 {
2403 self.generate_order_modify_rejected(
2404 order.trader_id(),
2405 order.strategy_id(),
2406 order.instrument_id(),
2407 order.client_order_id(),
2408 Ustr::from(
2409 format!(
2410 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
2411 order.order_type(),
2412 order.order_side(),
2413 trigger_price,
2414 self.core
2415 .bid
2416 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2417 self.core
2418 .ask
2419 .map_or_else(|| "None".to_string(), |p| p.to_string())
2420 )
2421 .as_str(),
2422 ),
2423 order.venue_order_id(),
2424 order.account_id(),
2425 );
2426 return;
2427 }
2428
2429 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
2430 }
2431
2432 fn update_stop_limit_order(
2433 &mut self,
2434 order: &mut OrderAny,
2435 quantity: Quantity,
2436 price: Price,
2437 trigger_price: Price,
2438 ) {
2439 if order.is_triggered().is_some_and(|t| t) {
2440 if self
2442 .core
2443 .is_limit_matched(order.order_side_specified(), price)
2444 {
2445 if order.is_post_only() {
2446 self.generate_order_modify_rejected(
2447 order.trader_id(),
2448 order.strategy_id(),
2449 order.instrument_id(),
2450 order.client_order_id(),
2451 Ustr::from(format!(
2452 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
2453 order.order_type(),
2454 order.order_side(),
2455 price,
2456 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
2457 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
2458 ).as_str()),
2459 order.venue_order_id(),
2460 order.account_id(),
2461 );
2462 return;
2463 }
2464 self.generate_order_updated(order, quantity, Some(price), None, None);
2465 order.set_liquidity_side(LiquiditySide::Taker);
2466 self.fill_limit_order(order);
2467 return; }
2469 } else {
2470 if self
2472 .core
2473 .is_stop_matched(order.order_side_specified(), trigger_price)
2474 {
2475 self.generate_order_modify_rejected(
2476 order.trader_id(),
2477 order.strategy_id(),
2478 order.instrument_id(),
2479 order.client_order_id(),
2480 Ustr::from(
2481 format!(
2482 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
2483 order.order_type(),
2484 order.order_side(),
2485 trigger_price,
2486 self.core
2487 .bid
2488 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2489 self.core
2490 .ask
2491 .map_or_else(|| "None".to_string(), |p| p.to_string())
2492 )
2493 .as_str(),
2494 ),
2495 order.venue_order_id(),
2496 order.account_id(),
2497 );
2498 return;
2499 }
2500 }
2501
2502 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
2503 }
2504
2505 fn update_market_if_touched_order(
2506 &mut self,
2507 order: &mut OrderAny,
2508 quantity: Quantity,
2509 trigger_price: Price,
2510 ) {
2511 if self
2512 .core
2513 .is_touch_triggered(order.order_side_specified(), trigger_price)
2514 {
2515 self.generate_order_modify_rejected(
2516 order.trader_id(),
2517 order.strategy_id(),
2518 order.instrument_id(),
2519 order.client_order_id(),
2520 Ustr::from(
2521 format!(
2522 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
2523 order.order_type(),
2524 order.order_side(),
2525 trigger_price,
2526 self.core
2527 .bid
2528 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2529 self.core
2530 .ask
2531 .map_or_else(|| "None".to_string(), |p| p.to_string())
2532 )
2533 .as_str(),
2534 ),
2535 order.venue_order_id(),
2536 order.account_id(),
2537 );
2538 return;
2540 }
2541
2542 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
2543 }
2544
2545 fn update_limit_if_touched_order(
2546 &mut self,
2547 order: &mut OrderAny,
2548 quantity: Quantity,
2549 price: Price,
2550 trigger_price: Price,
2551 ) {
2552 if order.is_triggered().is_some_and(|t| t) {
2553 if self
2555 .core
2556 .is_limit_matched(order.order_side_specified(), price)
2557 {
2558 if order.is_post_only() {
2559 self.generate_order_modify_rejected(
2560 order.trader_id(),
2561 order.strategy_id(),
2562 order.instrument_id(),
2563 order.client_order_id(),
2564 Ustr::from(format!(
2565 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
2566 order.order_type(),
2567 order.order_side(),
2568 price,
2569 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
2570 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
2571 ).as_str()),
2572 order.venue_order_id(),
2573 order.account_id(),
2574 );
2575 return;
2577 }
2578 self.generate_order_updated(order, quantity, Some(price), None, None);
2579 order.set_liquidity_side(LiquiditySide::Taker);
2580 self.fill_limit_order(order);
2581 return;
2582 }
2583 } else {
2584 if self
2586 .core
2587 .is_touch_triggered(order.order_side_specified(), trigger_price)
2588 {
2589 self.generate_order_modify_rejected(
2590 order.trader_id(),
2591 order.strategy_id(),
2592 order.instrument_id(),
2593 order.client_order_id(),
2594 Ustr::from(
2595 format!(
2596 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
2597 order.order_type(),
2598 order.order_side(),
2599 trigger_price,
2600 self.core
2601 .bid
2602 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2603 self.core
2604 .ask
2605 .map_or_else(|| "None".to_string(), |p| p.to_string())
2606 )
2607 .as_str(),
2608 ),
2609 order.venue_order_id(),
2610 order.account_id(),
2611 );
2612 return;
2613 }
2614 }
2615
2616 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
2617 }
2618
2619 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
2620 let (new_trigger_price, new_price) = trailing_stop_calculate(
2621 self.instrument.price_increment(),
2622 order.trigger_price(),
2623 order.activation_price(),
2624 order,
2625 self.core.bid,
2626 self.core.ask,
2627 self.core.last,
2628 )
2629 .unwrap();
2630
2631 if new_trigger_price.is_none() && new_price.is_none() {
2632 return;
2633 }
2634
2635 self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price, None);
2636 }
2637
2638 fn update_protection_price(&mut self, order: &mut OrderAny) {
2639 let protection_price = protection_price_calculate(
2640 self.instrument.price_increment(),
2641 order,
2642 self.config.price_protection_points,
2643 self.core.bid,
2644 self.core.ask,
2645 );
2646
2647 if let Ok(protection_price) = protection_price {
2648 self.generate_order_updated(
2649 order,
2650 order.quantity(),
2651 None,
2652 None,
2653 Some(protection_price),
2654 );
2655 }
2656 }
2657
2658 fn accept_order(&mut self, order: &mut OrderAny) {
2661 if order.is_closed() {
2662 return;
2664 }
2665 if order.status() != OrderStatus::Accepted {
2666 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2667 self.generate_order_accepted(order, venue_order_id);
2668
2669 if matches!(
2670 order.order_type(),
2671 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
2672 ) && order.trigger_price().is_none()
2673 {
2674 self.update_trailing_stop_order(order);
2675 }
2676 }
2677
2678 let _ = self.core.add_order(
2679 PassiveOrderAny::try_from(order.to_owned()).expect("passive order conversion"),
2680 );
2681 }
2682
2683 fn expire_order(&mut self, order: &PassiveOrderAny) {
2684 if self.config.support_contingent_orders
2685 && order
2686 .contingency_type()
2687 .is_some_and(|c| c != ContingencyType::NoContingency)
2688 {
2689 self.cancel_contingent_orders(&OrderAny::from(order.clone()));
2690 }
2691
2692 self.generate_order_expired(&order.to_any());
2693 }
2694
2695 fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
2696 let cancel_contingencies = cancel_contingencies.unwrap_or(true);
2697 if order.is_active_local() {
2698 log::error!(
2699 "Cannot cancel an order with {} from the matching engine",
2700 order.status()
2701 );
2702 return;
2703 }
2704
2705 if self.core.order_exists(order.client_order_id()) {
2707 let _ = self.core.delete_order(
2708 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
2709 );
2710 }
2711 self.cached_filled_qty.remove(&order.client_order_id());
2712
2713 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2714 self.generate_order_canceled(order, venue_order_id);
2715
2716 if self.config.support_contingent_orders
2717 && order.contingency_type().is_some()
2718 && order.contingency_type().unwrap() != ContingencyType::NoContingency
2719 && cancel_contingencies
2720 {
2721 self.cancel_contingent_orders(order);
2722 }
2723 }
2724
2725 fn update_order(
2726 &mut self,
2727 order: &mut OrderAny,
2728 quantity: Option<Quantity>,
2729 price: Option<Price>,
2730 trigger_price: Option<Price>,
2731 update_contingencies: Option<bool>,
2732 ) -> bool {
2733 let update_contingencies = update_contingencies.unwrap_or(true);
2734 let quantity = quantity.unwrap_or(order.quantity());
2735
2736 let price_prec = self.instrument.price_precision();
2737 let size_prec = self.instrument.size_precision();
2738 let instrument_id = self.instrument.id();
2739 if quantity.precision != size_prec {
2740 self.generate_order_modify_rejected(
2741 order.trader_id(),
2742 order.strategy_id(),
2743 order.instrument_id(),
2744 order.client_order_id(),
2745 Ustr::from(&format!(
2746 "Invalid update quantity precision {}, expected {size_prec} for {instrument_id}",
2747 quantity.precision
2748 )),
2749 order.venue_order_id(),
2750 order.account_id(),
2751 );
2752 return false;
2753 }
2754 if let Some(px) = price
2755 && px.precision != price_prec
2756 {
2757 self.generate_order_modify_rejected(
2758 order.trader_id(),
2759 order.strategy_id(),
2760 order.instrument_id(),
2761 order.client_order_id(),
2762 Ustr::from(&format!(
2763 "Invalid update price precision {}, expected {price_prec} for {instrument_id}",
2764 px.precision
2765 )),
2766 order.venue_order_id(),
2767 order.account_id(),
2768 );
2769 return false;
2770 }
2771 if let Some(tp) = trigger_price
2772 && tp.precision != price_prec
2773 {
2774 self.generate_order_modify_rejected(
2775 order.trader_id(),
2776 order.strategy_id(),
2777 order.instrument_id(),
2778 order.client_order_id(),
2779 Ustr::from(&format!(
2780 "Invalid update trigger_price precision {}, expected {price_prec} for {instrument_id}",
2781 tp.precision
2782 )),
2783 order.venue_order_id(),
2784 order.account_id(),
2785 );
2786 return false;
2787 }
2788
2789 let filled_qty = self
2791 .cached_filled_qty
2792 .get(&order.client_order_id())
2793 .copied()
2794 .unwrap_or(order.filled_qty());
2795 if quantity < filled_qty {
2796 self.generate_order_modify_rejected(
2797 order.trader_id(),
2798 order.strategy_id(),
2799 order.instrument_id(),
2800 order.client_order_id(),
2801 Ustr::from(&format!(
2802 "Cannot reduce order quantity {quantity} below filled quantity {filled_qty}",
2803 )),
2804 order.venue_order_id(),
2805 order.account_id(),
2806 );
2807 return false;
2808 }
2809
2810 match order {
2811 OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
2812 let price = price.unwrap_or(order.price().unwrap());
2813 self.update_limit_order(order, quantity, price);
2814 }
2815 OrderAny::StopMarket(_) => {
2816 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2817 self.update_stop_market_order(order, quantity, trigger_price);
2818 }
2819 OrderAny::StopLimit(_) => {
2820 let price = price.unwrap_or(order.price().unwrap());
2821 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2822 self.update_stop_limit_order(order, quantity, price, trigger_price);
2823 }
2824 OrderAny::MarketIfTouched(_) => {
2825 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2826 self.update_market_if_touched_order(order, quantity, trigger_price);
2827 }
2828 OrderAny::LimitIfTouched(_) => {
2829 let price = price.unwrap_or(order.price().unwrap());
2830 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2831 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2832 }
2833 OrderAny::TrailingStopMarket(_) => {
2834 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2835 self.update_market_if_touched_order(order, quantity, trigger_price);
2836 }
2837 OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
2838 let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
2839 let trigger_price =
2840 trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
2841 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2842 }
2843 _ => {
2844 panic!(
2845 "Unsupported order type {} for update_order",
2846 order.order_type()
2847 );
2848 }
2849 }
2850
2851 let new_leaves_qty = quantity.saturating_sub(filled_qty);
2853 if new_leaves_qty.is_zero() {
2854 if self.config.support_contingent_orders
2855 && order
2856 .contingency_type()
2857 .is_some_and(|c| c != ContingencyType::NoContingency)
2858 && update_contingencies
2859 {
2860 self.update_contingent_order(order);
2861 }
2862 self.cancel_order(order, Some(false));
2864 return true;
2865 }
2866
2867 if self.config.support_contingent_orders
2868 && order
2869 .contingency_type()
2870 .is_some_and(|c| c != ContingencyType::NoContingency)
2871 && update_contingencies
2872 {
2873 self.update_contingent_order(order);
2874 }
2875
2876 true
2877 }
2878
2879 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
2881 todo!("trigger_stop_order")
2882 }
2883
2884 fn update_contingent_order(&mut self, order: &OrderAny) {
2885 log::debug!("Updating OUO orders from {}", order.client_order_id());
2886 if let Some(linked_order_ids) = order.linked_order_ids() {
2887 let parent_filled_qty = self
2888 .cached_filled_qty
2889 .get(&order.client_order_id())
2890 .copied()
2891 .unwrap_or(order.filled_qty());
2892 let parent_leaves_qty = order.quantity().saturating_sub(parent_filled_qty);
2893
2894 for client_order_id in linked_order_ids {
2895 let mut child_order = match self.cache.borrow().order(client_order_id) {
2896 Some(order) => order.clone(),
2897 None => panic!("Order {client_order_id} not found in cache."),
2898 };
2899
2900 if child_order.is_active_local() {
2901 continue;
2902 }
2903
2904 let child_filled_qty = self
2905 .cached_filled_qty
2906 .get(&child_order.client_order_id())
2907 .copied()
2908 .unwrap_or(child_order.filled_qty());
2909
2910 if parent_leaves_qty.is_zero() {
2911 self.cancel_order(&child_order, Some(false));
2912 } else if child_filled_qty >= parent_leaves_qty {
2913 self.cancel_order(&child_order, Some(false));
2915 } else {
2916 let child_leaves_qty = child_order.quantity().saturating_sub(child_filled_qty);
2917 if child_leaves_qty != parent_leaves_qty {
2918 let price = child_order.price();
2919 let trigger_price = child_order.trigger_price();
2920 self.update_order(
2921 &mut child_order,
2922 Some(parent_leaves_qty),
2923 price,
2924 trigger_price,
2925 Some(false),
2926 );
2927 }
2928 }
2929 }
2930 }
2931 }
2932
2933 fn cancel_contingent_orders(&mut self, order: &OrderAny) {
2934 if let Some(linked_order_ids) = order.linked_order_ids() {
2935 for client_order_id in linked_order_ids {
2936 let contingent_order = match self.cache.borrow().order(client_order_id) {
2937 Some(order) => order.clone(),
2938 None => panic!("Cannot find contingent order for {client_order_id}"),
2939 };
2940 if contingent_order.is_active_local() {
2941 continue;
2943 }
2944 if !contingent_order.is_closed() {
2945 self.cancel_order(&contingent_order, Some(false));
2946 }
2947 }
2948 }
2949 }
2950
2951 fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
2954 let ts_now = self.clock.borrow().timestamp_ns();
2955 let account_id = order
2956 .account_id()
2957 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2958
2959 let due_post_only = reason.as_str().starts_with("POST_ONLY");
2961
2962 let event = OrderEventAny::Rejected(OrderRejected::new(
2963 order.trader_id(),
2964 order.strategy_id(),
2965 order.instrument_id(),
2966 order.client_order_id(),
2967 account_id,
2968 reason,
2969 UUID4::new(),
2970 ts_now,
2971 ts_now,
2972 false,
2973 due_post_only,
2974 ));
2975 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2976 }
2977
2978 fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
2979 let ts_now = self.clock.borrow().timestamp_ns();
2980 let account_id = order
2981 .account_id()
2982 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2983 let event = OrderEventAny::Accepted(OrderAccepted::new(
2984 order.trader_id(),
2985 order.strategy_id(),
2986 order.instrument_id(),
2987 order.client_order_id(),
2988 venue_order_id,
2989 account_id,
2990 UUID4::new(),
2991 ts_now,
2992 ts_now,
2993 false,
2994 ));
2995 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2996
2997 order.apply(event).expect("Failed to apply order event");
2999 }
3000
3001 #[allow(clippy::too_many_arguments)]
3002 fn generate_order_modify_rejected(
3003 &self,
3004 trader_id: TraderId,
3005 strategy_id: StrategyId,
3006 instrument_id: InstrumentId,
3007 client_order_id: ClientOrderId,
3008 reason: Ustr,
3009 venue_order_id: Option<VenueOrderId>,
3010 account_id: Option<AccountId>,
3011 ) {
3012 let ts_now = self.clock.borrow().timestamp_ns();
3013 let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
3014 trader_id,
3015 strategy_id,
3016 instrument_id,
3017 client_order_id,
3018 reason,
3019 UUID4::new(),
3020 ts_now,
3021 ts_now,
3022 false,
3023 venue_order_id,
3024 account_id,
3025 ));
3026 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3027 }
3028
3029 #[allow(clippy::too_many_arguments)]
3030 fn generate_order_cancel_rejected(
3031 &self,
3032 trader_id: TraderId,
3033 strategy_id: StrategyId,
3034 account_id: AccountId,
3035 instrument_id: InstrumentId,
3036 client_order_id: ClientOrderId,
3037 venue_order_id: Option<VenueOrderId>,
3038 reason: Ustr,
3039 ) {
3040 let ts_now = self.clock.borrow().timestamp_ns();
3041 let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
3042 trader_id,
3043 strategy_id,
3044 instrument_id,
3045 client_order_id,
3046 reason,
3047 UUID4::new(),
3048 ts_now,
3049 ts_now,
3050 false,
3051 venue_order_id,
3052 Some(account_id),
3053 ));
3054 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3055 }
3056
3057 fn generate_order_updated(
3058 &self,
3059 order: &mut OrderAny,
3060 quantity: Quantity,
3061 price: Option<Price>,
3062 trigger_price: Option<Price>,
3063 protection_price: Option<Price>,
3064 ) {
3065 let ts_now = self.clock.borrow().timestamp_ns();
3066 let event = OrderEventAny::Updated(OrderUpdated::new(
3067 order.trader_id(),
3068 order.strategy_id(),
3069 order.instrument_id(),
3070 order.client_order_id(),
3071 quantity,
3072 UUID4::new(),
3073 ts_now,
3074 ts_now,
3075 false,
3076 order.venue_order_id(),
3077 order.account_id(),
3078 price,
3079 trigger_price,
3080 protection_price,
3081 ));
3082 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3083
3084 order.apply(event).expect("Failed to apply order event");
3086 }
3087
3088 fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
3089 let ts_now = self.clock.borrow().timestamp_ns();
3090 let event = OrderEventAny::Canceled(OrderCanceled::new(
3091 order.trader_id(),
3092 order.strategy_id(),
3093 order.instrument_id(),
3094 order.client_order_id(),
3095 UUID4::new(),
3096 ts_now,
3097 ts_now,
3098 false,
3099 Some(venue_order_id),
3100 order.account_id(),
3101 ));
3102 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3103 }
3104
3105 fn generate_order_triggered(&self, order: &OrderAny) {
3106 let ts_now = self.clock.borrow().timestamp_ns();
3107 let event = OrderEventAny::Triggered(OrderTriggered::new(
3108 order.trader_id(),
3109 order.strategy_id(),
3110 order.instrument_id(),
3111 order.client_order_id(),
3112 UUID4::new(),
3113 ts_now,
3114 ts_now,
3115 false,
3116 order.venue_order_id(),
3117 order.account_id(),
3118 ));
3119 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3120 }
3121
3122 fn generate_order_expired(&self, order: &OrderAny) {
3123 let ts_now = self.clock.borrow().timestamp_ns();
3124 let event = OrderEventAny::Expired(OrderExpired::new(
3125 order.trader_id(),
3126 order.strategy_id(),
3127 order.instrument_id(),
3128 order.client_order_id(),
3129 UUID4::new(),
3130 ts_now,
3131 ts_now,
3132 false,
3133 order.venue_order_id(),
3134 order.account_id(),
3135 ));
3136 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3137 }
3138
3139 #[allow(clippy::too_many_arguments)]
3140 fn generate_order_filled(
3141 &mut self,
3142 order: &mut OrderAny,
3143 venue_order_id: VenueOrderId,
3144 venue_position_id: Option<PositionId>,
3145 last_qty: Quantity,
3146 last_px: Price,
3147 quote_currency: Currency,
3148 commission: Money,
3149 liquidity_side: LiquiditySide,
3150 ) {
3151 debug_assert!(
3152 last_qty <= order.quantity(),
3153 "Fill quantity {last_qty} exceeds order quantity {order_qty} for {client_order_id}",
3154 order_qty = order.quantity(),
3155 client_order_id = order.client_order_id()
3156 );
3157
3158 let ts_now = self.clock.borrow().timestamp_ns();
3159 let account_id = order
3160 .account_id()
3161 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
3162 let event = OrderEventAny::Filled(OrderFilled::new(
3163 order.trader_id(),
3164 order.strategy_id(),
3165 order.instrument_id(),
3166 order.client_order_id(),
3167 venue_order_id,
3168 account_id,
3169 self.ids_generator.generate_trade_id(),
3170 order.order_side(),
3171 order.order_type(),
3172 last_qty,
3173 last_px,
3174 quote_currency,
3175 liquidity_side,
3176 UUID4::new(),
3177 ts_now,
3178 ts_now,
3179 false,
3180 venue_position_id,
3181 Some(commission),
3182 ));
3183 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3184
3185 order.apply(event).expect("Failed to apply order event");
3187 }
3188}