1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 cell::RefCell,
22 cmp::min,
23 fmt::Debug,
24 ops::{Add, Sub},
25 rc::Rc,
26};
27
28use ahash::AHashMap;
29use chrono::TimeDelta;
30use nautilus_common::{
31 cache::Cache,
32 clock::Clock,
33 messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
34 msgbus::{self, MessagingSwitchboard},
35};
36use nautilus_core::{UUID4, UnixNanos};
37use nautilus_model::{
38 data::{
39 Bar, BarType, InstrumentClose, OrderBookDelta, OrderBookDeltas, OrderBookDepth10,
40 QuoteTick, TradeTick, order::BookOrder,
41 },
42 enums::{
43 AccountType, AggregationSource, AggressorSide, BookAction, BookType, ContingencyType,
44 InstrumentCloseType, LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide,
45 OrderSideSpecified, OrderStatus, OrderType, PositionSide, PriceType, TimeInForce,
46 TriggerType,
47 },
48 events::{
49 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
50 OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
51 },
52 identifiers::{
53 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
54 VenueOrderId,
55 },
56 instruments::{Instrument, InstrumentAny},
57 orderbook::OrderBook,
58 orders::{MarketOrder, Order, OrderAny, OrderCore},
59 position::Position,
60 types::{
61 Currency, Money, Price, Quantity, fixed::FIXED_PRECISION, price::PriceRaw,
62 quantity::QuantityRaw,
63 },
64};
65use ustr::Ustr;
66
67use crate::{
68 matching_core::{OrderMatchInfo, OrderMatchingCore},
69 matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
70 models::{
71 fee::{FeeModel, FeeModelAny},
72 fill::{FillModel, FillModelAny},
73 },
74 protection::protection_price_calculate,
75 trailing::trailing_stop_calculate,
76};
77
78pub struct OrderMatchingEngine {
80 pub venue: Venue,
82 pub instrument: InstrumentAny,
84 pub raw_id: u32,
86 pub book_type: BookType,
88 pub oms_type: OmsType,
90 pub account_type: AccountType,
92 pub market_status: MarketStatus,
94 pub config: OrderMatchingEngineConfig,
96 core: OrderMatchingCore,
97 clock: Rc<RefCell<dyn Clock>>,
98 cache: Rc<RefCell<Cache>>,
99 book: OrderBook,
100 fill_model: FillModelAny,
101 fee_model: FeeModelAny,
102 target_bid: Option<Price>,
103 target_ask: Option<Price>,
104 target_last: Option<Price>,
105 last_bar_bid: Option<Bar>,
106 last_bar_ask: Option<Bar>,
107 fill_at_market: bool,
108 execution_bar_types: AHashMap<InstrumentId, BarType>,
109 execution_bar_deltas: AHashMap<BarType, TimeDelta>,
110 account_ids: AHashMap<TraderId, AccountId>,
111 cached_filled_qty: AHashMap<ClientOrderId, Quantity>,
112 ids_generator: IdsGenerator,
113 last_trade_size: Option<Quantity>,
114 bid_consumption: AHashMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
115 ask_consumption: AHashMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
116 trade_consumption: QuantityRaw,
117 queue_ahead: AHashMap<ClientOrderId, (PriceRaw, QuantityRaw)>,
119 queue_excess: AHashMap<ClientOrderId, QuantityRaw>,
120 instrument_close: Option<InstrumentClose>,
121 expiration_processed: bool,
122}
123
124impl Debug for OrderMatchingEngine {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct(stringify!(OrderMatchingEngine))
127 .field("venue", &self.venue)
128 .field("instrument", &self.instrument.id())
129 .finish()
130 }
131}
132
133impl OrderMatchingEngine {
134 #[allow(clippy::too_many_arguments)]
136 pub fn new(
137 instrument: InstrumentAny,
138 raw_id: u32,
139 fill_model: FillModelAny,
140 fee_model: FeeModelAny,
141 book_type: BookType,
142 oms_type: OmsType,
143 account_type: AccountType,
144 clock: Rc<RefCell<dyn Clock>>,
145 cache: Rc<RefCell<Cache>>,
146 config: OrderMatchingEngineConfig,
147 ) -> Self {
148 let book = OrderBook::new(instrument.id(), book_type);
149 let core = OrderMatchingCore::new(
150 instrument.id(),
151 instrument.price_increment(),
152 None, None, None, );
156 let ids_generator = IdsGenerator::new(
157 instrument.id().venue,
158 oms_type,
159 raw_id,
160 config.use_random_ids,
161 config.use_position_ids,
162 cache.clone(),
163 );
164
165 Self {
166 venue: instrument.id().venue,
167 instrument,
168 raw_id,
169 fill_model,
170 fee_model,
171 book_type,
172 oms_type,
173 account_type,
174 clock,
175 cache,
176 book,
177 market_status: MarketStatus::Open,
178 config,
179 core,
180 target_bid: None,
181 target_ask: None,
182 target_last: None,
183 last_bar_bid: None,
184 last_bar_ask: None,
185 fill_at_market: true,
186 execution_bar_types: AHashMap::new(),
187 execution_bar_deltas: AHashMap::new(),
188 account_ids: AHashMap::new(),
189 cached_filled_qty: AHashMap::new(),
190 ids_generator,
191 last_trade_size: None,
192 bid_consumption: AHashMap::new(),
193 ask_consumption: AHashMap::new(),
194 trade_consumption: 0,
195 queue_ahead: AHashMap::new(),
196 queue_excess: AHashMap::new(),
197 instrument_close: None,
198 expiration_processed: false,
199 }
200 }
201
202 pub fn reset(&mut self) {
208 self.book.reset();
209 self.execution_bar_types.clear();
210 self.execution_bar_deltas.clear();
211 self.account_ids.clear();
212 self.cached_filled_qty.clear();
213 self.core.reset();
214 self.target_bid = None;
215 self.target_ask = None;
216 self.target_last = None;
217 self.last_trade_size = None;
218 self.bid_consumption.clear();
219 self.ask_consumption.clear();
220 self.trade_consumption = 0;
221 self.queue_ahead.clear();
222 self.queue_excess.clear();
223 self.instrument_close = None;
224 self.expiration_processed = false;
225 self.fill_at_market = true;
226 self.ids_generator.reset();
227
228 log::info!("Reset {}", self.instrument.id());
229 }
230
231 fn apply_liquidity_consumption(
232 &mut self,
233 fills: Vec<(Price, Quantity)>,
234 order_side: OrderSide,
235 leaves_qty: Quantity,
236 book_prices: Option<&[Price]>,
237 ) -> Vec<(Price, Quantity)> {
238 if !self.config.liquidity_consumption {
239 return fills;
240 }
241
242 let consumption = match order_side {
243 OrderSide::Buy => &mut self.ask_consumption,
244 OrderSide::Sell => &mut self.bid_consumption,
245 _ => return fills,
246 };
247
248 let mut adjusted_fills = Vec::with_capacity(fills.len());
249 let mut remaining_qty = leaves_qty.raw;
250
251 for (fill_idx, (price, qty)) in fills.into_iter().enumerate() {
252 if remaining_qty == 0 {
253 break;
254 }
255
256 let book_price = book_prices
259 .and_then(|bp| bp.get(fill_idx).copied())
260 .unwrap_or(price);
261
262 let book_price_raw = book_price.raw;
263 let level_size = self
264 .book
265 .get_quantity_at_level(book_price, order_side, qty.precision);
266
267 let (original_size, consumed) = consumption
268 .entry(book_price_raw)
269 .or_insert((level_size.raw, 0));
270
271 if *original_size != level_size.raw {
273 *original_size = level_size.raw;
274 *consumed = 0;
275 }
276
277 let available = original_size.saturating_sub(*consumed);
278 if available == 0 {
279 continue;
280 }
281
282 let adjusted_qty_raw = min(min(qty.raw, available), remaining_qty);
283 if adjusted_qty_raw == 0 {
284 continue;
285 }
286
287 *consumed += adjusted_qty_raw;
288 remaining_qty -= adjusted_qty_raw;
289
290 let adjusted_qty = Quantity::from_raw(adjusted_qty_raw, qty.precision);
291 adjusted_fills.push((price, adjusted_qty));
292 }
293
294 adjusted_fills
295 }
296
297 pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
299 self.fill_model = fill_model;
300 }
301
302 fn snapshot_queue_position(&mut self, order: &OrderAny, price: Price) {
303 if !self.config.queue_position {
304 return;
305 }
306 let size_prec = self.instrument.size_precision();
307
308 let qty_ahead = self.book.get_quantity_at_level(
311 price,
312 OrderCore::opposite_side(order.order_side()),
313 size_prec,
314 );
315 self.queue_ahead
316 .insert(order.client_order_id(), (price.raw, qty_ahead.raw));
317 }
318
319 fn decrement_queue_on_trade(
320 &mut self,
321 price_raw: PriceRaw,
322 trade_size_raw: QuantityRaw,
323 aggressor_side: AggressorSide,
324 ) {
325 if !self.config.queue_position {
326 return;
327 }
328
329 self.queue_excess.clear();
330
331 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
332 let mut entries: Vec<(ClientOrderId, QuantityRaw, QuantityRaw)> = Vec::new();
333 let mut stale: Vec<ClientOrderId> = Vec::new();
334
335 for client_order_id in keys {
336 let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
337 {
338 Some(v) => v,
339 None => continue,
340 };
341
342 let cache = self.cache.borrow();
343 let order_info = cache.order(&client_order_id).and_then(|order| {
344 if order.is_closed() {
345 None
346 } else {
347 Some((order.order_side(), order.leaves_qty().raw))
348 }
349 });
350 drop(cache);
351
352 let Some((order_side, leaves_raw)) = order_info else {
353 stale.push(client_order_id);
354 continue;
355 };
356
357 if order_price_raw != price_raw || ahead_raw == 0 {
358 continue;
359 }
360
361 let should_decrement = matches!(aggressor_side, AggressorSide::NoAggressor)
362 || (aggressor_side == AggressorSide::Buyer && order_side == OrderSide::Sell)
363 || (aggressor_side == AggressorSide::Seller && order_side == OrderSide::Buy);
364
365 if should_decrement {
366 entries.push((client_order_id, ahead_raw, leaves_raw));
367 }
368 }
369
370 for id in stale {
371 self.queue_ahead.remove(&id);
372 }
373
374 entries.sort_by_key(|&(_, ahead, _)| ahead);
376
377 let mut remaining = trade_size_raw;
378 let mut prev_position: QuantityRaw = 0;
379
380 for (client_order_id, ahead_raw, leaves_raw) in &entries {
381 if remaining == 0 {
382 let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
383 self.queue_ahead
384 .insert(*client_order_id, (price_raw, new_ahead));
385 if new_ahead == 0 {
386 self.queue_excess.insert(*client_order_id, 0);
388 }
389 continue;
390 }
391
392 let gap = ahead_raw.saturating_sub(prev_position);
394 let queue_consumed = remaining.min(gap);
395 remaining -= queue_consumed;
396
397 if remaining == 0 && queue_consumed < gap {
398 let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
399 self.queue_ahead
400 .insert(*client_order_id, (price_raw, new_ahead));
401 continue;
402 }
403
404 self.queue_ahead.insert(*client_order_id, (price_raw, 0));
405 let excess = remaining.min(*leaves_raw);
406 self.queue_excess.insert(*client_order_id, excess);
407 remaining -= excess;
408 prev_position = ahead_raw + excess;
409 }
410 }
411
412 fn determine_trade_fill_qty(&self, order: &OrderAny) -> Option<QuantityRaw> {
413 if !self.config.queue_position {
414 return Some(order.leaves_qty().raw);
415 }
416
417 let client_order_id = order.client_order_id();
418
419 if let Some(&(tracked_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id)
420 && let Some(order_price) = order.price()
421 && order_price.raw == tracked_price_raw
422 && ahead_raw > 0
423 {
424 let crossed = self.last_trade_size.is_some()
426 && self
427 .core
428 .last
429 .is_some_and(|trade_price| match order.order_side() {
430 OrderSide::Buy => trade_price.raw < order_price.raw,
431 OrderSide::Sell => trade_price.raw > order_price.raw,
432 _ => false,
433 });
434 if !crossed {
435 return None;
436 }
437 }
438
439 let leaves_raw = order.leaves_qty().raw;
440 if leaves_raw == 0 {
441 return None;
442 }
443
444 let mut available_raw = leaves_raw;
445
446 if let Some(trade_size) = self.last_trade_size {
448 let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
449 available_raw = available_raw.min(remaining);
450
451 if let Some(&excess_raw) = self.queue_excess.get(&client_order_id) {
452 if excess_raw == 0 {
453 return None;
454 }
455 available_raw = available_raw.min(excess_raw);
456 }
457 }
458
459 if available_raw == 0 {
460 return None;
461 }
462
463 Some(available_raw)
464 }
465
466 fn clear_all_queue_positions(&mut self) {
467 for (_, (_, ahead_raw)) in &mut self.queue_ahead {
468 *ahead_raw = 0;
469 }
470 }
471
472 fn clear_queue_on_delete(&mut self, deleted_price_raw: PriceRaw, deleted_side: OrderSide) {
473 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
474 for client_order_id in keys {
475 if let Some(&(order_price_raw, _)) = self.queue_ahead.get(&client_order_id)
476 && order_price_raw == deleted_price_raw
477 {
478 let cache = self.cache.borrow();
479 if let Some(order) = cache.order(&client_order_id)
480 && order.order_side() == deleted_side
481 {
482 drop(cache);
483 self.queue_ahead
484 .insert(client_order_id, (order_price_raw, 0));
485 }
486 }
487 }
488 }
489
490 fn cap_queue_on_update(
491 &mut self,
492 price_raw: PriceRaw,
493 new_size_raw: QuantityRaw,
494 side: OrderSide,
495 ) {
496 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
497 for client_order_id in keys {
498 if let Some(&(order_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id)
499 && order_price_raw == price_raw
500 && ahead_raw > new_size_raw
501 {
502 let cache = self.cache.borrow();
503 if let Some(order) = cache.order(&client_order_id)
504 && order.order_side() == side
505 {
506 drop(cache);
507 self.queue_ahead
508 .insert(client_order_id, (order_price_raw, new_size_raw));
509 }
510 }
511 }
512 }
513
514 #[must_use]
515 pub fn best_bid_price(&self) -> Option<Price> {
517 self.book.best_bid_price()
518 }
519
520 #[must_use]
521 pub fn best_ask_price(&self) -> Option<Price> {
523 self.book.best_ask_price()
524 }
525
526 #[must_use]
527 pub const fn get_book(&self) -> &OrderBook {
529 &self.book
530 }
531
532 #[must_use]
533 pub const fn get_open_bid_orders(&self) -> &[OrderMatchInfo] {
535 self.core.get_orders_bid()
536 }
537
538 #[must_use]
539 pub const fn get_open_ask_orders(&self) -> &[OrderMatchInfo] {
541 self.core.get_orders_ask()
542 }
543
544 #[must_use]
545 pub fn get_open_orders(&self) -> Vec<OrderMatchInfo> {
547 let mut orders = Vec::new();
548 orders.extend_from_slice(self.core.get_orders_bid());
549 orders.extend_from_slice(self.core.get_orders_ask());
550 orders
551 }
552
553 #[must_use]
554 pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
556 self.core.order_exists(client_order_id)
557 }
558
559 #[must_use]
560 pub const fn get_core(&self) -> &OrderMatchingCore {
561 &self.core
562 }
563
564 pub fn get_core_mut(&mut self) -> &mut OrderMatchingCore {
565 &mut self.core
566 }
567
568 pub fn set_fill_at_market(&mut self, value: bool) {
569 self.fill_at_market = value;
570 }
571
572 fn check_price_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
575 let expected = self.instrument.price_precision();
576 if actual != expected {
577 anyhow::bail!(
578 "Invalid {field} precision {actual}, expected {expected} for {}",
579 self.instrument.id()
580 );
581 }
582 Ok(())
583 }
584
585 fn check_size_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
586 let expected = self.instrument.size_precision();
587 if actual != expected {
588 anyhow::bail!(
589 "Invalid {field} precision {actual}, expected {expected} for {}",
590 self.instrument.id()
591 );
592 }
593 Ok(())
594 }
595
596 pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) -> anyhow::Result<()> {
604 log::debug!("Processing {delta}");
605
606 if matches!(delta.action, BookAction::Add | BookAction::Update) {
608 self.check_price_precision(delta.order.price.precision, "delta order price")?;
609 self.check_size_precision(delta.order.size.precision, "delta order size")?;
610 }
611
612 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
613 self.book.apply_delta(delta)?;
614 }
615
616 if self.config.queue_position {
617 if (delta.flags & 32) != 0 || delta.action == BookAction::Clear {
618 self.clear_all_queue_positions();
619 } else if delta.action == BookAction::Delete {
620 self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
621 } else if delta.action == BookAction::Update {
622 self.cap_queue_on_update(
623 delta.order.price.raw,
624 delta.order.size.raw,
625 delta.order.side,
626 );
627 }
628 }
629
630 self.iterate(delta.ts_init, AggressorSide::NoAggressor);
631 Ok(())
632 }
633
634 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
642 log::debug!("Processing {deltas}");
643
644 for delta in &deltas.deltas {
646 if matches!(delta.action, BookAction::Add | BookAction::Update) {
647 self.check_price_precision(delta.order.price.precision, "delta order price")?;
648 self.check_size_precision(delta.order.size.precision, "delta order size")?;
649 }
650 }
651
652 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
653 self.book.apply_deltas(deltas)?;
654 }
655
656 if self.config.queue_position {
657 for delta in &deltas.deltas {
658 if (delta.flags & 32) != 0 || delta.action == BookAction::Clear {
659 self.clear_all_queue_positions();
660 break;
661 } else if delta.action == BookAction::Delete {
662 self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
663 } else if delta.action == BookAction::Update {
664 self.cap_queue_on_update(
665 delta.order.price.raw,
666 delta.order.size.raw,
667 delta.order.side,
668 );
669 }
670 }
671 }
672
673 self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
674 Ok(())
675 }
676
677 pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) -> anyhow::Result<()> {
686 log::debug!("Processing OrderBookDepth10 for {}", depth.instrument_id);
687
688 for order in &depth.bids {
690 if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
691 continue;
692 }
693 self.check_price_precision(order.price.precision, "bid price")?;
694 self.check_size_precision(order.size.precision, "bid size")?;
695 }
696 for order in &depth.asks {
697 if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
698 continue;
699 }
700 self.check_price_precision(order.price.precision, "ask price")?;
701 self.check_size_precision(order.size.precision, "ask size")?;
702 }
703
704 if self.book_type == BookType::L1_MBP {
707 let quote = QuoteTick::new(
708 depth.instrument_id,
709 depth.bids[0].price,
710 depth.asks[0].price,
711 depth.bids[0].size,
712 depth.asks[0].size,
713 depth.ts_event,
714 depth.ts_init,
715 );
716 self.book.update_quote_tick("e)?;
717 } else {
718 self.book.apply_depth(depth)?;
719 }
720
721 if self.config.queue_position {
723 self.clear_all_queue_positions();
724 }
725
726 self.iterate(depth.ts_init, AggressorSide::NoAggressor);
727 Ok(())
728 }
729
730 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
738 log::debug!("Processing {quote}");
739
740 self.check_price_precision(quote.bid_price.precision, "bid_price")
741 .unwrap();
742 self.check_price_precision(quote.ask_price.precision, "ask_price")
743 .unwrap();
744 self.check_size_precision(quote.bid_size.precision, "bid_size")
745 .unwrap();
746 self.check_size_precision(quote.ask_size.precision, "ask_size")
747 .unwrap();
748
749 if self.book_type == BookType::L1_MBP {
750 self.book.update_quote_tick(quote).unwrap();
751 }
752
753 self.iterate(quote.ts_init, AggressorSide::NoAggressor);
754 }
755
756 pub fn process_bar(&mut self, bar: &Bar) {
767 log::debug!("Processing {bar}");
768
769 if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
771 return;
772 }
773
774 let bar_type = bar.bar_type;
775 if bar_type.aggregation_source() == AggregationSource::Internal {
777 return;
778 }
779
780 self.check_price_precision(bar.open.precision, "bar open")
781 .unwrap();
782 self.check_price_precision(bar.high.precision, "bar high")
783 .unwrap();
784 self.check_price_precision(bar.low.precision, "bar low")
785 .unwrap();
786 self.check_price_precision(bar.close.precision, "bar close")
787 .unwrap();
788 self.check_size_precision(bar.volume.precision, "bar volume")
789 .unwrap();
790
791 let execution_bar_type =
792 if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
793 execution_bar_type.to_owned()
794 } else {
795 self.execution_bar_types
796 .insert(bar.instrument_id(), bar_type);
797 self.execution_bar_deltas
798 .insert(bar_type, bar_type.spec().timedelta());
799 bar_type
800 };
801
802 if execution_bar_type != bar_type {
803 let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
804 if bar_type_timedelta.is_none() {
805 bar_type_timedelta = Some(bar_type.spec().timedelta());
806 self.execution_bar_deltas
807 .insert(bar_type, bar_type_timedelta.unwrap());
808 }
809 if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
810 >= &bar_type_timedelta.unwrap()
811 {
812 self.execution_bar_types
813 .insert(bar_type.instrument_id(), bar_type);
814 } else {
815 return;
816 }
817 }
818
819 match bar_type.spec().price_type {
820 PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
821 PriceType::Bid => {
822 self.last_bar_bid = Some(bar.to_owned());
823 self.process_quote_ticks_from_bar(bar);
824 }
825 PriceType::Ask => {
826 self.last_bar_ask = Some(bar.to_owned());
827 self.process_quote_ticks_from_bar(bar);
828 }
829 PriceType::Mark => panic!("Not implemented"),
830 }
831 }
832
833 fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
834 let quarter_raw = bar.volume.raw / 4;
836 let remainder_raw = bar.volume.raw % 4;
837 let size = Quantity::from_raw(quarter_raw, bar.volume.precision);
838 let close_size = Quantity::from_raw(quarter_raw + remainder_raw, bar.volume.precision);
839
840 let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
841 {
842 AggressorSide::Buyer
843 } else {
844 AggressorSide::Seller
845 };
846
847 let mut trade_tick = TradeTick::new(
849 bar.instrument_id(),
850 bar.open,
851 size,
852 aggressor_side,
853 self.ids_generator.generate_trade_id(),
854 bar.ts_init,
855 bar.ts_init,
856 );
857
858 if !self.core.is_last_initialized {
860 self.fill_at_market = true;
861 self.book.update_trade_tick(&trade_tick).unwrap();
862 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
863 self.core.set_last_raw(trade_tick.price);
864 } else if self.core.last.is_some_and(|last| bar.open != last) {
865 self.fill_at_market = true;
867 self.book.update_trade_tick(&trade_tick).unwrap();
868 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
869 self.core.set_last_raw(trade_tick.price);
870 }
871
872 let high_first = !self.config.bar_adaptive_high_low_ordering
875 || (bar.high.raw - bar.open.raw).abs() < (bar.low.raw - bar.open.raw).abs();
876
877 if high_first {
878 self.process_bar_high(&mut trade_tick, bar);
879 self.process_bar_low(&mut trade_tick, bar);
880 } else {
881 self.process_bar_low(&mut trade_tick, bar);
882 self.process_bar_high(&mut trade_tick, bar);
883 }
884
885 if self.core.last.is_some_and(|last| bar.close != last) {
887 self.fill_at_market = false;
888 trade_tick.price = bar.close;
889 trade_tick.size = close_size;
890 if bar.close > self.core.last.unwrap() {
891 trade_tick.aggressor_side = AggressorSide::Buyer;
892 } else {
893 trade_tick.aggressor_side = AggressorSide::Seller;
894 }
895 trade_tick.trade_id = self.ids_generator.generate_trade_id();
896
897 self.book.update_trade_tick(&trade_tick).unwrap();
898 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
899
900 self.core.set_last_raw(trade_tick.price);
901 }
902
903 self.fill_at_market = true;
904 }
905
906 fn process_bar_high(&mut self, trade_tick: &mut TradeTick, bar: &Bar) {
907 if self.core.last.is_some_and(|last| bar.high > last) {
908 self.fill_at_market = false;
909 trade_tick.price = bar.high;
910 trade_tick.aggressor_side = AggressorSide::Buyer;
911 trade_tick.trade_id = self.ids_generator.generate_trade_id();
912
913 self.book.update_trade_tick(trade_tick).unwrap();
914 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
915
916 self.core.set_last_raw(trade_tick.price);
917 }
918 }
919
920 fn process_bar_low(&mut self, trade_tick: &mut TradeTick, bar: &Bar) {
921 if self.core.last.is_some_and(|last| bar.low < last) {
922 self.fill_at_market = false;
923 trade_tick.price = bar.low;
924 trade_tick.aggressor_side = AggressorSide::Seller;
925 trade_tick.trade_id = self.ids_generator.generate_trade_id();
926
927 self.book.update_trade_tick(trade_tick).unwrap();
928 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
929
930 self.core.set_last_raw(trade_tick.price);
931 }
932 }
933
934 fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
935 if self.last_bar_bid.is_none()
937 || self.last_bar_ask.is_none()
938 || self.last_bar_bid.unwrap().ts_init != self.last_bar_ask.unwrap().ts_init
939 {
940 return;
941 }
942 let bid_bar = self.last_bar_bid.unwrap();
943 let ask_bar = self.last_bar_ask.unwrap();
944
945 let bid_quarter = bid_bar.volume.raw / 4;
947 let bid_remainder = bid_bar.volume.raw % 4;
948 let ask_quarter = ask_bar.volume.raw / 4;
949 let ask_remainder = ask_bar.volume.raw % 4;
950
951 let bid_size = Quantity::from_raw(bid_quarter, bar.volume.precision);
952 let ask_size = Quantity::from_raw(ask_quarter, bar.volume.precision);
953 let bid_close_size = Quantity::from_raw(bid_quarter + bid_remainder, bar.volume.precision);
954 let ask_close_size = Quantity::from_raw(ask_quarter + ask_remainder, bar.volume.precision);
955
956 let mut quote_tick = QuoteTick::new(
958 self.book.instrument_id,
959 bid_bar.open,
960 ask_bar.open,
961 bid_size,
962 ask_size,
963 bid_bar.ts_init,
964 bid_bar.ts_init,
965 );
966
967 self.fill_at_market = true;
969 self.book.update_quote_tick("e_tick).unwrap();
970 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
971
972 self.fill_at_market = false;
974 quote_tick.bid_price = bid_bar.high;
975 quote_tick.ask_price = ask_bar.high;
976 self.book.update_quote_tick("e_tick).unwrap();
977 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
978
979 self.fill_at_market = false;
981 quote_tick.bid_price = bid_bar.low;
982 quote_tick.ask_price = ask_bar.low;
983 self.book.update_quote_tick("e_tick).unwrap();
984 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
985
986 self.fill_at_market = false;
988 quote_tick.bid_price = bid_bar.close;
989 quote_tick.ask_price = ask_bar.close;
990 quote_tick.bid_size = bid_close_size;
991 quote_tick.ask_size = ask_close_size;
992 self.book.update_quote_tick("e_tick).unwrap();
993 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
994
995 self.last_bar_bid = None;
996 self.last_bar_ask = None;
997 self.fill_at_market = true;
998 }
999
1000 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
1013 log::debug!("Processing {trade}");
1014
1015 self.check_price_precision(trade.price.precision, "trade price")
1016 .unwrap();
1017 self.check_size_precision(trade.size.precision, "trade size")
1018 .unwrap();
1019
1020 let price_raw = trade.price.raw;
1021
1022 if self.book_type == BookType::L1_MBP {
1023 self.book.update_trade_tick(trade).unwrap();
1024 }
1025
1026 self.core.set_last_raw(trade.price);
1027
1028 if !self.config.trade_execution {
1029 if self.book_type == BookType::L1_MBP {
1031 if let Some(bid) = self.book.best_bid_price() {
1032 self.core.set_bid_raw(bid);
1033 }
1034 if let Some(ask) = self.book.best_ask_price() {
1035 self.core.set_ask_raw(ask);
1036 }
1037 }
1038 return;
1039 }
1040
1041 let aggressor_side = trade.aggressor_side;
1042
1043 match aggressor_side {
1044 AggressorSide::Buyer => {
1045 if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
1046 self.core.set_ask_raw(trade.price);
1047 }
1048 if self.core.bid.is_none()
1049 || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1050 {
1051 self.core.set_bid_raw(trade.price);
1052 }
1053 }
1054 AggressorSide::Seller => {
1055 if self.core.bid.is_none()
1056 || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1057 {
1058 self.core.set_bid_raw(trade.price);
1059 }
1060 if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
1061 self.core.set_ask_raw(trade.price);
1062 }
1063 }
1064 AggressorSide::NoAggressor => {
1065 if self.core.bid.is_none()
1066 || price_raw <= self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1067 {
1068 self.core.set_bid_raw(trade.price);
1069 }
1070 if self.core.ask.is_none() || price_raw >= self.core.ask.map_or(0, |p| p.raw) {
1071 self.core.set_ask_raw(trade.price);
1072 }
1073 }
1074 }
1075
1076 let original_bid = self.core.bid;
1077 let original_ask = self.core.ask;
1078
1079 match aggressor_side {
1080 AggressorSide::Seller => {
1081 if original_ask.is_some_and(|ask| price_raw < ask.raw) {
1082 self.core.set_ask_raw(trade.price);
1083 }
1084 }
1085 AggressorSide::Buyer => {
1086 if original_bid.is_some_and(|bid| price_raw > bid.raw) {
1087 self.core.set_bid_raw(trade.price);
1088 }
1089 }
1090 AggressorSide::NoAggressor => {
1091 self.core.set_bid_raw(trade.price);
1093 self.core.set_ask_raw(trade.price);
1094 }
1095 }
1096
1097 self.last_trade_size = Some(trade.size);
1098 self.trade_consumption = 0;
1099
1100 self.decrement_queue_on_trade(price_raw, trade.size.raw, aggressor_side);
1101 self.iterate(trade.ts_init, aggressor_side);
1102
1103 self.last_trade_size = None;
1104 self.trade_consumption = 0;
1105
1106 match aggressor_side {
1108 AggressorSide::Seller => {
1109 if let Some(ask) = original_ask
1110 && price_raw < ask.raw
1111 {
1112 self.core.ask = Some(ask);
1113 }
1114 }
1115 AggressorSide::Buyer => {
1116 if let Some(bid) = original_bid
1117 && price_raw > bid.raw
1118 {
1119 self.core.bid = Some(bid);
1120 }
1121 }
1122 AggressorSide::NoAggressor => {}
1123 }
1124 }
1125
1126 pub fn process_status(&mut self, action: MarketStatusAction) {
1128 log::debug!("Processing {action}");
1129
1130 if self.market_status == MarketStatus::Closed
1132 && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
1133 {
1134 self.market_status = MarketStatus::Open;
1135 }
1136 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
1138 self.market_status = MarketStatus::Paused;
1139 }
1140 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
1142 self.market_status = MarketStatus::Suspended;
1143 }
1144 if self.market_status == MarketStatus::Open
1146 && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
1147 {
1148 self.market_status = MarketStatus::Closed;
1149 }
1150 }
1151
1152 pub fn process_instrument_close(&mut self, close: InstrumentClose) {
1157 if close.instrument_id != self.instrument.id() {
1158 log::warn!(
1159 "Received instrument close for unknown instrument_id: {}",
1160 close.instrument_id
1161 );
1162 return;
1163 }
1164
1165 if close.close_type == InstrumentCloseType::ContractExpired {
1166 self.instrument_close = Some(close);
1167 self.iterate(close.ts_init, AggressorSide::NoAggressor);
1168 }
1169 }
1170
1171 fn check_instrument_expiration(&mut self) {
1172 if self.expiration_processed || self.instrument_close.is_none() {
1173 return;
1174 }
1175
1176 self.expiration_processed = true;
1177 let close = self.instrument_close.take().unwrap();
1178 log::info!("{} reached expiration", self.instrument.id());
1179
1180 let open_orders: Vec<OrderMatchInfo> = self.get_open_orders();
1181 for order_info in &open_orders {
1182 let order = {
1183 let cache = self.cache.borrow();
1184 cache.order(&order_info.client_order_id).cloned()
1185 };
1186 if let Some(order) = order {
1187 self.cancel_order(&order, None);
1188 }
1189 }
1190
1191 let instrument_id = self.instrument.id();
1192 let positions: Vec<(TraderId, StrategyId, PositionId, OrderSide, Quantity)> = {
1193 let cache = self.cache.borrow();
1194 cache
1195 .positions_open(None, Some(&instrument_id), None, None, None)
1196 .into_iter()
1197 .map(|pos| {
1198 let closing_side = match pos.side {
1199 PositionSide::Long => OrderSide::Sell,
1200 PositionSide::Short => OrderSide::Buy,
1201 _ => OrderSide::NoOrderSide,
1202 };
1203 (
1204 pos.trader_id,
1205 pos.strategy_id,
1206 pos.id,
1207 closing_side,
1208 pos.quantity,
1209 )
1210 })
1211 .collect()
1212 };
1213
1214 let ts_now = self.clock.borrow().timestamp_ns();
1215
1216 for (trader_id, strategy_id, position_id, closing_side, quantity) in positions {
1217 let client_order_id =
1218 ClientOrderId::from(format!("EXPIRATION-{}-{}", self.venue, UUID4::new()).as_str());
1219 let mut order = OrderAny::Market(MarketOrder::new(
1220 trader_id,
1221 strategy_id,
1222 instrument_id,
1223 client_order_id,
1224 closing_side,
1225 quantity,
1226 TimeInForce::Gtc,
1227 UUID4::new(),
1228 ts_now,
1229 true, false,
1231 None,
1232 None,
1233 None,
1234 None,
1235 None,
1236 None,
1237 None,
1238 Some(vec![Ustr::from(&format!(
1239 "EXPIRATION_{}_CLOSE",
1240 self.venue
1241 ))]),
1242 ));
1243
1244 if self
1245 .cache
1246 .borrow_mut()
1247 .add_order(order.clone(), Some(position_id), None, false)
1248 .is_err()
1249 {
1250 log::debug!("Expiration order already in cache: {client_order_id}");
1251 }
1252
1253 let venue_order_id = self.ids_generator.get_venue_order_id(&order).unwrap();
1254 self.generate_order_accepted(&mut order, venue_order_id);
1255 self.apply_fills(
1256 &mut order,
1257 vec![(close.close_price, quantity)],
1258 LiquiditySide::Taker,
1259 Some(position_id),
1260 None,
1261 );
1262 }
1263 }
1264
1265 #[allow(clippy::needless_return)]
1276 pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
1277 {
1279 let cache_borrow = self.cache.as_ref().borrow();
1280
1281 if self.core.order_exists(order.client_order_id()) {
1282 self.generate_order_rejected(order, "Order already exists".into());
1283 return;
1284 }
1285
1286 self.account_ids.insert(order.trader_id(), account_id);
1288
1289 if self.instrument.has_expiration() {
1291 if let Some(activation_ns) = self.instrument.activation_ns()
1292 && self.clock.borrow().timestamp_ns() < activation_ns
1293 {
1294 self.generate_order_rejected(
1295 order,
1296 format!(
1297 "Contract {} is not yet active, activation {activation_ns}",
1298 self.instrument.id(),
1299 )
1300 .into(),
1301 );
1302 return;
1303 }
1304 if let Some(expiration_ns) = self.instrument.expiration_ns()
1305 && self.clock.borrow().timestamp_ns() >= expiration_ns
1306 {
1307 self.generate_order_rejected(
1308 order,
1309 format!(
1310 "Contract {} has expired, expiration {expiration_ns}",
1311 self.instrument.id(),
1312 )
1313 .into(),
1314 );
1315 return;
1316 }
1317 }
1318
1319 if self.config.support_contingent_orders {
1321 if let Some(parent_order_id) = order.parent_order_id() {
1322 let parent_order = cache_borrow.order(&parent_order_id);
1323 if parent_order.is_none()
1324 || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
1325 {
1326 panic!("OTO parent not found");
1327 }
1328 if let Some(parent_order) = parent_order {
1329 let parent_order_status = parent_order.status();
1330 let order_is_open = order.is_open();
1331 if parent_order.status() == OrderStatus::Rejected && order.is_open() {
1332 self.generate_order_rejected(
1333 order,
1334 format!("Rejected OTO order from {parent_order_id}").into(),
1335 );
1336 return;
1337 } else if parent_order.status() == OrderStatus::Accepted
1338 && parent_order.status() == OrderStatus::Triggered
1339 {
1340 log::info!(
1341 "Pending OTO order {} triggers from {parent_order_id}",
1342 order.client_order_id(),
1343 );
1344 return;
1345 }
1346 }
1347 }
1348
1349 if let Some(linked_order_ids) = order.linked_order_ids() {
1350 for client_order_id in linked_order_ids {
1351 match cache_borrow.order(client_order_id) {
1352 Some(contingent_order)
1353 if (order.contingency_type().unwrap() == ContingencyType::Oco
1354 || order.contingency_type().unwrap()
1355 == ContingencyType::Ouo)
1356 && !order.is_closed()
1357 && contingent_order.is_closed() =>
1358 {
1359 self.generate_order_rejected(
1360 order,
1361 format!("Contingent order {client_order_id} already closed")
1362 .into(),
1363 );
1364 return;
1365 }
1366 None => panic!("Cannot find contingent order for {client_order_id}"),
1367 _ => {}
1368 }
1369 }
1370 }
1371 }
1372
1373 if order.quantity().precision != self.instrument.size_precision() {
1375 self.generate_order_rejected(
1376 order,
1377 format!(
1378 "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
1379 order.client_order_id(),
1380 order.quantity().precision,
1381 self.instrument.id(),
1382 self.instrument.size_precision()
1383 )
1384 .into(),
1385 );
1386 return;
1387 }
1388
1389 if let Some(price) = order.price()
1391 && price.precision != self.instrument.price_precision()
1392 {
1393 self.generate_order_rejected(
1394 order,
1395 format!(
1396 "Invalid order price precision for order {}, was {} when {} price precision is {}",
1397 order.client_order_id(),
1398 price.precision,
1399 self.instrument.id(),
1400 self.instrument.price_precision()
1401 )
1402 .into(),
1403 );
1404 return;
1405 }
1406
1407 if let Some(trigger_price) = order.trigger_price()
1409 && trigger_price.precision != self.instrument.price_precision()
1410 {
1411 self.generate_order_rejected(
1412 order,
1413 format!(
1414 "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
1415 order.client_order_id(),
1416 trigger_price.precision,
1417 self.instrument.id(),
1418 self.instrument.price_precision()
1419 )
1420 .into(),
1421 );
1422 return;
1423 }
1424
1425 let position: Option<&Position> = cache_borrow
1427 .position_for_order(&order.client_order_id())
1428 .or_else(|| {
1429 if self.oms_type == OmsType::Netting {
1430 let position_id = PositionId::new(
1431 format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
1432 );
1433 cache_borrow.position(&position_id)
1434 } else {
1435 None
1436 }
1437 });
1438
1439 if order.order_side() == OrderSide::Sell
1441 && self.account_type != AccountType::Margin
1442 && matches!(self.instrument, InstrumentAny::Equity(_))
1443 && (position.is_none()
1444 || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
1445 {
1446 let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
1447 self.generate_order_rejected(
1448 order,
1449 format!(
1450 "Short selling not permitted on a CASH account with position {position_string} and order {order}",
1451 )
1452 .into(),
1453 );
1454 return;
1455 }
1456
1457 if self.config.use_reduce_only
1459 && order.is_reduce_only()
1460 && !order.is_closed()
1461 && position.is_none_or(|pos| {
1462 pos.is_closed()
1463 || (order.is_buy() && pos.is_long())
1464 || (order.is_sell() && pos.is_short())
1465 })
1466 {
1467 self.generate_order_rejected(
1468 order,
1469 format!(
1470 "Reduce-only order {} ({}-{}) would have increased position",
1471 order.client_order_id(),
1472 order.order_type().to_string().to_uppercase(),
1473 order.order_side().to_string().to_uppercase()
1474 )
1475 .into(),
1476 );
1477 return;
1478 }
1479 }
1480
1481 match order.order_type() {
1482 OrderType::Market => self.process_market_order(order),
1483 OrderType::Limit => self.process_limit_order(order),
1484 OrderType::MarketToLimit => self.process_market_to_limit_order(order),
1485 OrderType::StopMarket => self.process_stop_market_order(order),
1486 OrderType::StopLimit => self.process_stop_limit_order(order),
1487 OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
1488 OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
1489 OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
1490 OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
1491 }
1492 }
1493
1494 pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
1496 if !self.core.order_exists(command.client_order_id) {
1497 self.generate_order_modify_rejected(
1498 command.trader_id,
1499 command.strategy_id,
1500 command.instrument_id,
1501 command.client_order_id,
1502 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
1503 command.venue_order_id,
1504 Some(account_id),
1505 );
1506 return;
1507 }
1508
1509 let mut order = match self.cache.borrow().order(&command.client_order_id).cloned() {
1510 Some(order) => order,
1511 None => {
1512 log::error!(
1513 "Cannot modify order: order {} not found in cache",
1514 command.client_order_id
1515 );
1516 return;
1517 }
1518 };
1519
1520 let update_success = self.update_order(
1521 &mut order,
1522 command.quantity,
1523 command.price,
1524 command.trigger_price,
1525 None,
1526 );
1527
1528 if update_success && order.is_open() {
1530 let _ = self.core.delete_order(command.client_order_id);
1531 let match_info = OrderMatchInfo::new(
1532 order.client_order_id(),
1533 order.order_side().as_specified(),
1534 order.order_type(),
1535 order.trigger_price(),
1536 order.price(),
1537 true,
1538 );
1539 self.core.add_order(match_info);
1540
1541 if self.config.queue_position
1542 && let Some(new_price) = order.price()
1543 {
1544 self.snapshot_queue_position(&order, new_price);
1545 self.queue_excess.remove(&order.client_order_id());
1546 }
1547 }
1548 }
1549
1550 pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
1552 if !self.core.order_exists(command.client_order_id) {
1553 self.generate_order_cancel_rejected(
1554 command.trader_id,
1555 command.strategy_id,
1556 account_id,
1557 command.instrument_id,
1558 command.client_order_id,
1559 command.venue_order_id,
1560 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
1561 );
1562 return;
1563 }
1564
1565 let order = match self.cache.borrow().order(&command.client_order_id).cloned() {
1566 Some(order) => order,
1567 None => {
1568 log::error!(
1569 "Cannot cancel order: order {} not found in cache",
1570 command.client_order_id
1571 );
1572 return;
1573 }
1574 };
1575
1576 if order.is_inflight() || order.is_open() {
1577 self.cancel_order(&order, None);
1578 }
1579 }
1580
1581 pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
1583 let instrument_id = command.instrument_id;
1584 let open_orders = self
1585 .cache
1586 .borrow()
1587 .orders_open(None, Some(&instrument_id), None, None, None)
1588 .into_iter()
1589 .cloned()
1590 .collect::<Vec<OrderAny>>();
1591 for order in open_orders {
1592 if command.order_side != OrderSide::NoOrderSide
1593 && command.order_side != order.order_side()
1594 {
1595 continue;
1596 }
1597 if order.is_inflight() || order.is_open() {
1598 self.cancel_order(&order, None);
1599 }
1600 }
1601 }
1602
1603 pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
1605 for order in &command.cancels {
1606 self.process_cancel(order, account_id);
1607 }
1608 }
1609
1610 fn process_market_order(&mut self, order: &mut OrderAny) {
1611 if order.time_in_force() == TimeInForce::AtTheOpen
1612 || order.time_in_force() == TimeInForce::AtTheClose
1613 {
1614 log::error!(
1615 "Market auction for the time in force {} is currently not supported",
1616 order.time_in_force()
1617 );
1618 return;
1619 }
1620
1621 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
1623 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
1624 {
1625 self.generate_order_rejected(
1626 order,
1627 format!("No market for {}", order.instrument_id()).into(),
1628 );
1629 return;
1630 }
1631
1632 if self.config.use_market_order_acks {
1633 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1634 self.generate_order_accepted(order, venue_order_id);
1635 }
1636
1637 if let Err(e) = self
1639 .cache
1640 .borrow_mut()
1641 .add_order(order.clone(), None, None, false)
1642 {
1643 log::debug!("Order already in cache: {e}");
1644 }
1645
1646 self.fill_market_order(order.client_order_id());
1647 }
1648
1649 fn process_limit_order(&mut self, order: &mut OrderAny) {
1650 let limit_px = order.price().expect("Limit order must have a price");
1651 if order.is_post_only()
1652 && self
1653 .core
1654 .is_limit_matched(order.order_side_specified(), limit_px)
1655 {
1656 self.generate_order_rejected(
1657 order,
1658 format!(
1659 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
1660 order.order_type(),
1661 order.order_side(),
1662 order.price().unwrap(),
1663 self.core
1664 .bid
1665 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1666 self.core
1667 .ask
1668 .map_or_else(|| "None".to_string(), |p| p.to_string())
1669 )
1670 .into(),
1671 );
1672 return;
1673 }
1674
1675 self.accept_order(order);
1677
1678 if self
1680 .core
1681 .is_limit_matched(order.order_side_specified(), limit_px)
1682 {
1683 order.set_liquidity_side(LiquiditySide::Taker);
1685
1686 if self
1687 .cache
1688 .borrow_mut()
1689 .add_order(order.clone(), None, None, false)
1690 .is_err()
1691 && let Err(e) = self.cache.borrow_mut().update_order(order)
1692 {
1693 log::debug!("Failed to update order in cache: {e}");
1694 }
1695 self.fill_limit_order(order.client_order_id());
1696 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
1697 self.cancel_order(order, None);
1698 } else {
1699 order.set_liquidity_side(LiquiditySide::Maker);
1701
1702 if let Some(price) = order.price() {
1703 self.snapshot_queue_position(order, price);
1704 }
1705
1706 if let Err(e) = self
1707 .cache
1708 .borrow_mut()
1709 .add_order(order.clone(), None, None, false)
1710 {
1711 log::debug!("Order already in cache: {e}");
1712 }
1713 }
1714 }
1715
1716 fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
1717 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
1719 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
1720 {
1721 self.generate_order_rejected(
1722 order,
1723 format!("No market for {}", order.instrument_id()).into(),
1724 );
1725 return;
1726 }
1727
1728 if self.config.use_market_order_acks {
1729 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1730 self.generate_order_accepted(order, venue_order_id);
1731 }
1732
1733 if let Err(e) = self
1735 .cache
1736 .borrow_mut()
1737 .add_order(order.clone(), None, None, false)
1738 {
1739 log::debug!("Order already in cache: {e}");
1740 }
1741 let client_order_id = order.client_order_id();
1742 self.fill_market_order(client_order_id);
1743
1744 let filled_qty = self
1746 .cached_filled_qty
1747 .get(&client_order_id)
1748 .copied()
1749 .unwrap_or_default();
1750 let leaves_qty = order.quantity().saturating_sub(filled_qty);
1751 if !leaves_qty.is_zero() {
1752 let updated_order = self.cache.borrow().order(&client_order_id).cloned();
1754 if let Some(mut updated_order) = updated_order {
1755 self.accept_order(&mut updated_order);
1756 }
1757 }
1758 }
1759
1760 fn process_stop_market_order(&mut self, order: &mut OrderAny) {
1761 let stop_px = order
1762 .trigger_price()
1763 .expect("Stop order must have a trigger price");
1764 if self
1765 .core
1766 .is_stop_matched(order.order_side_specified(), stop_px)
1767 {
1768 if self.config.reject_stop_orders {
1769 self.generate_order_rejected(
1770 order,
1771 format!(
1772 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1773 order.order_type(),
1774 order.order_side(),
1775 order.trigger_price().unwrap(),
1776 self.core
1777 .bid
1778 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1779 self.core
1780 .ask
1781 .map_or_else(|| "None".to_string(), |p| p.to_string())
1782 ).into(),
1783 );
1784 return;
1785 }
1786 if let Err(e) = self
1787 .cache
1788 .borrow_mut()
1789 .add_order(order.clone(), None, None, false)
1790 {
1791 log::debug!("Order already in cache: {e}");
1792 }
1793 self.fill_market_order(order.client_order_id());
1794 return;
1795 }
1796
1797 self.accept_order(order);
1799
1800 order.set_liquidity_side(LiquiditySide::Maker);
1802 if let Err(e) = self
1803 .cache
1804 .borrow_mut()
1805 .add_order(order.clone(), None, None, false)
1806 {
1807 log::debug!("Order already in cache: {e}");
1808 }
1809 }
1810
1811 fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
1812 let stop_px = order
1813 .trigger_price()
1814 .expect("Stop order must have a trigger price");
1815 if self
1816 .core
1817 .is_stop_matched(order.order_side_specified(), stop_px)
1818 {
1819 if self.config.reject_stop_orders {
1820 self.generate_order_rejected(
1821 order,
1822 format!(
1823 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1824 order.order_type(),
1825 order.order_side(),
1826 order.trigger_price().unwrap(),
1827 self.core
1828 .bid
1829 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1830 self.core
1831 .ask
1832 .map_or_else(|| "None".to_string(), |p| p.to_string())
1833 ).into(),
1834 );
1835 return;
1836 }
1837
1838 self.accept_order(order);
1839 self.generate_order_triggered(order);
1840
1841 let limit_px = order.price().expect("Stop limit order must have a price");
1843 if self
1844 .core
1845 .is_limit_matched(order.order_side_specified(), limit_px)
1846 {
1847 order.set_liquidity_side(LiquiditySide::Taker);
1848 if let Err(e) = self
1849 .cache
1850 .borrow_mut()
1851 .add_order(order.clone(), None, None, false)
1852 {
1853 log::debug!("Order already in cache: {e}");
1854 }
1855 self.fill_limit_order(order.client_order_id());
1856 }
1857
1858 return;
1860 }
1861
1862 self.accept_order(order);
1863
1864 order.set_liquidity_side(LiquiditySide::Maker);
1866 if let Err(e) = self
1867 .cache
1868 .borrow_mut()
1869 .add_order(order.clone(), None, None, false)
1870 {
1871 log::debug!("Order already in cache: {e}");
1872 }
1873 }
1874
1875 fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
1876 if self
1877 .core
1878 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1879 {
1880 if self.config.reject_stop_orders {
1881 self.generate_order_rejected(
1882 order,
1883 format!(
1884 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1885 order.order_type(),
1886 order.order_side(),
1887 order.trigger_price().unwrap(),
1888 self.core
1889 .bid
1890 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1891 self.core
1892 .ask
1893 .map_or_else(|| "None".to_string(), |p| p.to_string())
1894 ).into(),
1895 );
1896 return;
1897 }
1898 if let Err(e) = self
1899 .cache
1900 .borrow_mut()
1901 .add_order(order.clone(), None, None, false)
1902 {
1903 log::debug!("Order already in cache: {e}");
1904 }
1905 self.fill_market_order(order.client_order_id());
1906 return;
1907 }
1908
1909 self.accept_order(order);
1911
1912 order.set_liquidity_side(LiquiditySide::Maker);
1914 if let Err(e) = self
1915 .cache
1916 .borrow_mut()
1917 .add_order(order.clone(), None, None, false)
1918 {
1919 log::debug!("Order already in cache: {e}");
1920 }
1921 }
1922
1923 fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
1924 if self
1925 .core
1926 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1927 {
1928 if self.config.reject_stop_orders {
1929 self.generate_order_rejected(
1930 order,
1931 format!(
1932 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1933 order.order_type(),
1934 order.order_side(),
1935 order.trigger_price().unwrap(),
1936 self.core
1937 .bid
1938 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1939 self.core
1940 .ask
1941 .map_or_else(|| "None".to_string(), |p| p.to_string())
1942 ).into(),
1943 );
1944 return;
1945 }
1946 self.accept_order(order);
1947 self.generate_order_triggered(order);
1948
1949 if self
1951 .core
1952 .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1953 {
1954 order.set_liquidity_side(LiquiditySide::Taker);
1955 if let Err(e) = self
1956 .cache
1957 .borrow_mut()
1958 .add_order(order.clone(), None, None, false)
1959 {
1960 log::debug!("Order already in cache: {e}");
1961 }
1962 self.fill_limit_order(order.client_order_id());
1963 }
1964 return;
1965 }
1966
1967 self.accept_order(order);
1969
1970 order.set_liquidity_side(LiquiditySide::Maker);
1972 if let Err(e) = self
1973 .cache
1974 .borrow_mut()
1975 .add_order(order.clone(), None, None, false)
1976 {
1977 log::debug!("Order already in cache: {e}");
1978 }
1979 }
1980
1981 fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1982 if let Some(trigger_price) = order.trigger_price()
1983 && self
1984 .core
1985 .is_stop_matched(order.order_side_specified(), trigger_price)
1986 {
1987 self.generate_order_rejected(
1988 order,
1989 format!(
1990 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1991 order.order_type(),
1992 order.order_side(),
1993 trigger_price,
1994 self.core
1995 .bid
1996 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1997 self.core
1998 .ask
1999 .map_or_else(|| "None".to_string(), |p| p.to_string())
2000 ).into(),
2001 );
2002 return;
2003 }
2004
2005 self.accept_order(order);
2007
2008 order.set_liquidity_side(LiquiditySide::Maker);
2010 if let Err(e) = self
2011 .cache
2012 .borrow_mut()
2013 .add_order(order.clone(), None, None, false)
2014 {
2015 log::debug!("Order already in cache: {e}");
2016 }
2017 }
2018
2019 pub fn iterate(&mut self, timestamp_ns: UnixNanos, aggressor_side: AggressorSide) {
2028 if aggressor_side == AggressorSide::NoAggressor {
2033 if let Some(bid) = self.book.best_bid_price() {
2034 self.core.set_bid_raw(bid);
2035 }
2036 if let Some(ask) = self.book.best_ask_price() {
2037 self.core.set_ask_raw(ask);
2038 }
2039 }
2040
2041 self.check_instrument_expiration();
2043
2044 self.core.iterate();
2045
2046 let orders_bid = self.core.get_orders_bid().to_vec();
2047 let orders_ask = self.core.get_orders_ask().to_vec();
2048
2049 self.iterate_orders(timestamp_ns, &orders_bid);
2050 self.iterate_orders(timestamp_ns, &orders_ask);
2051
2052 self.core.bid = self.book.best_bid_price();
2055 self.core.ask = self.book.best_ask_price();
2056 }
2057
2058 fn get_trailing_activation_price(
2059 &self,
2060 trigger_type: TriggerType,
2061 order_side: OrderSide,
2062 bid: Option<Price>,
2063 ask: Option<Price>,
2064 last: Option<Price>,
2065 ) -> Option<Price> {
2066 match trigger_type {
2067 TriggerType::LastPrice => last,
2068 TriggerType::LastOrBidAsk => last.or(match order_side {
2069 OrderSide::Buy => ask,
2070 OrderSide::Sell => bid,
2071 _ => None,
2072 }),
2073 _ => match order_side {
2075 OrderSide::Buy => ask,
2076 OrderSide::Sell => bid,
2077 _ => None,
2078 },
2079 }
2080 }
2081
2082 fn maybe_activate_trailing_stop(
2083 &mut self,
2084 order: &mut OrderAny,
2085 bid: Option<Price>,
2086 ask: Option<Price>,
2087 last: Option<Price>,
2088 ) -> bool {
2089 match order {
2090 OrderAny::TrailingStopMarket(inner) => {
2091 if inner.is_activated {
2092 return true;
2093 }
2094
2095 if inner.activation_price.is_none() {
2096 let px = self.get_trailing_activation_price(
2097 inner.trigger_type,
2098 inner.order_side(),
2099 bid,
2100 ask,
2101 last,
2102 );
2103 if let Some(p) = px {
2104 inner.activation_price = Some(p);
2105 inner.set_activated();
2106 if let Err(e) = self.cache.borrow_mut().update_order(order) {
2107 log::error!("Failed to update order: {e}");
2108 }
2109 return true;
2110 }
2111 return false;
2112 }
2113
2114 let activation_price = inner.activation_price.unwrap();
2115 let hit = match inner.order_side() {
2116 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
2117 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
2118 _ => false,
2119 };
2120 if hit {
2121 inner.set_activated();
2122 if let Err(e) = self.cache.borrow_mut().update_order(order) {
2123 log::error!("Failed to update order: {e}");
2124 }
2125 }
2126 hit
2127 }
2128 OrderAny::TrailingStopLimit(inner) => {
2129 if inner.is_activated {
2130 return true;
2131 }
2132
2133 if inner.activation_price.is_none() {
2134 let px = self.get_trailing_activation_price(
2135 inner.trigger_type,
2136 inner.order_side(),
2137 bid,
2138 ask,
2139 last,
2140 );
2141 if let Some(p) = px {
2142 inner.activation_price = Some(p);
2143 inner.set_activated();
2144 if let Err(e) = self.cache.borrow_mut().update_order(order) {
2145 log::error!("Failed to update order: {e}");
2146 }
2147 return true;
2148 }
2149 return false;
2150 }
2151
2152 let activation_price = inner.activation_price.unwrap();
2153 let hit = match inner.order_side() {
2154 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
2155 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
2156 _ => false,
2157 };
2158 if hit {
2159 inner.set_activated();
2160 if let Err(e) = self.cache.borrow_mut().update_order(order) {
2161 log::error!("Failed to update order: {e}");
2162 }
2163 }
2164 hit
2165 }
2166 _ => true,
2167 }
2168 }
2169
2170 fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[OrderMatchInfo]) {
2171 for match_info in orders {
2172 let order = match self
2173 .cache
2174 .borrow()
2175 .order(&match_info.client_order_id)
2176 .cloned()
2177 {
2178 Some(order) => order,
2179 None => {
2180 log::warn!(
2181 "Order {} not found in cache during iteration, skipping",
2182 match_info.client_order_id
2183 );
2184 continue;
2185 }
2186 };
2187
2188 if order.is_closed() {
2189 continue;
2190 }
2191
2192 if self.config.support_gtd_orders
2193 && order
2194 .expire_time()
2195 .is_some_and(|expire_timestamp_ns| timestamp_ns >= expire_timestamp_ns)
2196 {
2197 let _ = self.core.delete_order(match_info.client_order_id);
2198 self.cached_filled_qty.remove(&match_info.client_order_id);
2199 self.expire_order(&order);
2200 continue;
2201 }
2202
2203 if matches!(
2204 match_info.order_type,
2205 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
2206 ) {
2207 let mut any = order;
2208
2209 if !self.maybe_activate_trailing_stop(
2210 &mut any,
2211 self.core.bid,
2212 self.core.ask,
2213 self.core.last,
2214 ) {
2215 continue;
2216 }
2217
2218 self.update_trailing_stop_order(&mut any);
2219
2220 let _ = self.core.delete_order(match_info.client_order_id);
2222 let updated_match_info = OrderMatchInfo::new(
2223 any.client_order_id(),
2224 any.order_side().as_specified(),
2225 any.order_type(),
2226 any.trigger_price(),
2227 any.price(),
2228 match &any {
2229 OrderAny::TrailingStopMarket(o) => o.is_activated,
2230 OrderAny::TrailingStopLimit(o) => o.is_activated,
2231 _ => true,
2232 },
2233 );
2234 self.core.add_order(updated_match_info);
2235 }
2236
2237 if let Some(target_bid) = self.target_bid {
2239 self.core.bid = Some(target_bid);
2240 self.target_bid = None;
2241 }
2242 if let Some(target_bid) = self.target_bid.take() {
2243 self.core.bid = Some(target_bid);
2244 self.target_bid = None;
2245 }
2246 if let Some(target_ask) = self.target_ask.take() {
2247 self.core.ask = Some(target_ask);
2248 self.target_ask = None;
2249 }
2250 if let Some(target_last) = self.target_last.take() {
2251 self.core.last = Some(target_last);
2252 self.target_last = None;
2253 }
2254 }
2255
2256 self.target_bid = None;
2258 self.target_ask = None;
2259 self.target_last = None;
2260 }
2261
2262 fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
2263 match order.price() {
2264 Some(order_price) => {
2265 let mut fills = if self.config.liquidity_consumption {
2270 let size_prec = self.instrument.size_precision();
2271 self.book
2272 .get_all_crossed_levels(order.order_side(), order_price, size_prec)
2273 } else {
2274 let book_order =
2275 BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
2276 self.book.simulate_fills(&book_order)
2277 };
2278
2279 if let Some(trade_size) = self.last_trade_size
2281 && let Some(trade_price) = self.core.last
2282 {
2283 let fills_at_trade_price = fills.iter().any(|(px, _)| *px == trade_price);
2284
2285 if !fills_at_trade_price
2286 && self
2287 .core
2288 .is_limit_matched(order.order_side_specified(), order_price)
2289 {
2290 let leaves_qty = order.leaves_qty();
2293 let available_qty = if self.config.liquidity_consumption {
2294 let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
2295 Quantity::from_raw(remaining, trade_size.precision)
2296 } else {
2297 trade_size
2298 };
2299
2300 let fill_qty = min(leaves_qty, available_qty);
2301
2302 if !fill_qty.is_zero() {
2303 log::debug!(
2304 "Trade execution fill: {} @ {} (trade_price={}, available: {}, book had {} fills)",
2305 fill_qty,
2306 order_price,
2307 trade_price,
2308 available_qty,
2309 fills.len()
2310 );
2311
2312 if self.config.liquidity_consumption {
2313 self.trade_consumption += fill_qty.raw;
2314 }
2315
2316 return vec![(order_price, fill_qty)];
2321 }
2322 }
2323 }
2324
2325 if fills.is_empty() {
2327 return fills;
2328 }
2329
2330 let book_prices: Vec<Price> = if self.config.liquidity_consumption {
2334 fills.iter().map(|(px, _)| *px).collect()
2335 } else {
2336 Vec::new()
2337 };
2338 let book_prices_ref: Option<&[Price]> = if book_prices.is_empty() {
2339 None
2340 } else {
2341 Some(&book_prices)
2342 };
2343
2344 if let Some(triggered_price) = order.trigger_price() {
2346 if order
2348 .liquidity_side()
2349 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
2350 {
2351 if order.order_side() == OrderSide::Sell && order_price > triggered_price {
2352 let first_fill = fills.first().unwrap();
2354 let triggered_qty = first_fill.1;
2355 fills[0] = (triggered_price, triggered_qty);
2356 self.target_bid = self.core.bid;
2357 self.target_ask = self.core.ask;
2358 self.target_last = self.core.last;
2359 self.core.set_ask_raw(order_price);
2360 self.core.set_last_raw(order_price);
2361 } else if order.order_side() == OrderSide::Buy
2362 && order_price < triggered_price
2363 {
2364 let first_fill = fills.first().unwrap();
2366 let triggered_qty = first_fill.1;
2367 fills[0] = (triggered_price, triggered_qty);
2368 self.target_bid = self.core.bid;
2369 self.target_ask = self.core.ask;
2370 self.target_last = self.core.last;
2371 self.core.set_bid_raw(order_price);
2372 self.core.set_last_raw(order_price);
2373 }
2374 }
2375 }
2376
2377 if order
2379 .liquidity_side()
2380 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
2381 {
2382 match order.order_side().as_specified() {
2383 OrderSideSpecified::Buy => {
2384 let target_price = if order
2385 .trigger_price()
2386 .is_some_and(|trigger_price| order_price > trigger_price)
2387 {
2388 order.trigger_price().unwrap()
2389 } else {
2390 order_price
2391 };
2392 for fill in &mut fills {
2393 let last_px = fill.0;
2394 if last_px < order_price {
2395 self.target_bid = self.core.bid;
2397 self.target_ask = self.core.ask;
2398 self.target_last = self.core.last;
2399 self.core.set_ask_raw(target_price);
2400 self.core.set_last_raw(target_price);
2401 fill.0 = target_price;
2402 }
2403 }
2404 }
2405 OrderSideSpecified::Sell => {
2406 let target_price = if order
2407 .trigger_price()
2408 .is_some_and(|trigger_price| order_price < trigger_price)
2409 {
2410 order.trigger_price().unwrap()
2411 } else {
2412 order_price
2413 };
2414 for fill in &mut fills {
2415 let last_px = fill.0;
2416 if last_px > order_price {
2417 self.target_bid = self.core.bid;
2419 self.target_ask = self.core.ask;
2420 self.target_last = self.core.last;
2421 self.core.set_bid_raw(target_price);
2422 self.core.set_last_raw(target_price);
2423 fill.0 = target_price;
2424 }
2425 }
2426 }
2427 }
2428 }
2429
2430 self.apply_liquidity_consumption(
2431 fills,
2432 order.order_side(),
2433 order.leaves_qty(),
2434 book_prices_ref,
2435 )
2436 }
2437 None => panic!("Limit order must have a price"),
2438 }
2439 }
2440
2441 fn determine_market_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
2442 let price = match order.order_side().as_specified() {
2443 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
2444 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
2445 };
2446
2447 let mut fills = if self.config.liquidity_consumption {
2450 let size_prec = self.instrument.size_precision();
2451 self.book
2452 .get_all_crossed_levels(order.order_side(), price, size_prec)
2453 } else {
2454 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
2455 self.book.simulate_fills(&book_order)
2456 };
2457
2458 if !self.fill_at_market
2461 && self.book_type == BookType::L1_MBP
2462 && !fills.is_empty()
2463 && matches!(
2464 order.order_type(),
2465 OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
2466 )
2467 && let Some(trigger_price) = order.trigger_price()
2468 {
2469 fills[0] = (trigger_price, fills[0].1);
2470
2471 let mut remaining_qty = order.leaves_qty().raw;
2473 let mut capped_fills = Vec::with_capacity(fills.len());
2474
2475 for (price, qty) in fills {
2476 if remaining_qty == 0 {
2477 break;
2478 }
2479
2480 let capped_qty_raw = min(qty.raw, remaining_qty);
2481 if capped_qty_raw == 0 {
2482 continue;
2483 }
2484
2485 remaining_qty -= capped_qty_raw;
2486 capped_fills.push((price, Quantity::from_raw(capped_qty_raw, qty.precision)));
2487 }
2488
2489 return capped_fills;
2490 }
2491
2492 fills
2493 }
2494
2495 fn determine_market_fill_model_price_and_volume(
2496 &mut self,
2497 order: &OrderAny,
2498 ) -> (Vec<(Price, Quantity)>, bool) {
2499 if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
2500 && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
2501 &self.instrument,
2502 order,
2503 best_bid,
2504 best_ask,
2505 )
2506 {
2507 let price = match order.order_side().as_specified() {
2508 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
2509 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
2510 };
2511 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
2512 let fills = book.simulate_fills(&book_order);
2513 if !fills.is_empty() {
2514 return (fills, true);
2515 }
2516 }
2517 (self.determine_market_price_and_volume(order), false)
2518 }
2519
2520 fn determine_limit_fill_model_price_and_volume(
2521 &mut self,
2522 order: &OrderAny,
2523 ) -> Vec<(Price, Quantity)> {
2524 if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
2525 && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
2526 &self.instrument,
2527 order,
2528 best_bid,
2529 best_ask,
2530 )
2531 && let Some(limit_price) = order.price()
2532 {
2533 let book_order = BookOrder::new(order.order_side(), limit_price, order.quantity(), 0);
2534 let fills = book.simulate_fills(&book_order);
2535 if !fills.is_empty() {
2536 return fills;
2537 }
2538 }
2539 self.determine_limit_price_and_volume(order)
2540 }
2541
2542 pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
2547 let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
2548 Some(order) => order,
2549 None => {
2550 log::error!("Cannot fill market order: order {client_order_id} not found in cache");
2551 return;
2552 }
2553 };
2554
2555 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
2556 && filled_qty >= &order.quantity()
2557 {
2558 log::info!(
2559 "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
2560 filled_qty,
2561 order.quantity(),
2562 order.filled_qty(),
2563 order.quantity()
2564 );
2565 return;
2566 }
2567
2568 let venue_position_id = self.ids_generator.get_position_id(&order, Some(true));
2569 let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
2570 let cache = self.cache.as_ref().borrow();
2571 cache.position(&venue_position_id).cloned()
2572 } else {
2573 None
2574 };
2575
2576 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
2577 log::warn!(
2578 "Canceling REDUCE_ONLY {} as would increase position",
2579 order.order_type()
2580 );
2581 self.cancel_order(&order, None);
2582 return;
2583 }
2584
2585 order.set_liquidity_side(LiquiditySide::Taker);
2586 let (mut fills, from_synthetic) = self.determine_market_fill_model_price_and_volume(&order);
2587
2588 if let Some(protection_points) = self.config.price_protection_points
2590 && matches!(
2591 order.order_type(),
2592 OrderType::Market | OrderType::StopMarket
2593 )
2594 && let Ok(protection_price) = protection_price_calculate(
2595 self.instrument.price_increment(),
2596 &order,
2597 protection_points,
2598 self.core.bid,
2599 self.core.ask,
2600 )
2601 {
2602 fills = self.filter_fills_by_protection(fills, &order, protection_price);
2603 }
2604
2605 let is_trigger_price_fill = !self.fill_at_market
2608 && self.book_type == BookType::L1_MBP
2609 && matches!(
2610 order.order_type(),
2611 OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
2612 )
2613 && order.trigger_price().is_some();
2614
2615 if !from_synthetic && !is_trigger_price_fill {
2616 fills = self.apply_liquidity_consumption(
2617 fills,
2618 order.order_side(),
2619 order.leaves_qty(),
2620 None,
2621 );
2622 }
2623
2624 self.apply_fills(&mut order, fills, LiquiditySide::Taker, None, position);
2625 }
2626
2627 fn filter_fills_by_protection(
2628 &self,
2629 fills: Vec<(Price, Quantity)>,
2630 order: &OrderAny,
2631 protection_price: Price,
2632 ) -> Vec<(Price, Quantity)> {
2633 let protection_raw = protection_price.raw;
2634 fills
2635 .into_iter()
2636 .filter(|(fill_price, _)| {
2637 match order.order_side() {
2638 OrderSide::Buy => fill_price.raw <= protection_raw,
2640 OrderSide::Sell => fill_price.raw >= protection_raw,
2642 OrderSide::NoOrderSide => false,
2643 }
2644 })
2645 .collect()
2646 }
2647
2648 pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
2657 let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
2658 Some(order) => order,
2659 None => {
2660 log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
2661 return;
2662 }
2663 };
2664
2665 match order.price() {
2666 Some(order_price) => {
2667 let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
2668 if let Some(&qty) = cached_filled_qty
2669 && qty >= order.quantity()
2670 {
2671 log::debug!(
2672 "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
2673 qty,
2674 order.quantity(),
2675 order.filled_qty(),
2676 order.leaves_qty(),
2677 );
2678 return;
2679 }
2680
2681 if order
2683 .liquidity_side()
2684 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
2685 {
2686 let at_limit = if self.last_trade_size.is_some() && self.core.last.is_some() {
2689 self.core.last.is_some_and(|last| last == order_price)
2690 } else if order.order_side() == OrderSide::Buy {
2691 self.core.bid.is_some_and(|bid| bid == order_price)
2692 } else {
2693 self.core.ask.is_some_and(|ask| ask == order_price)
2694 };
2695
2696 if at_limit && !self.fill_model.is_limit_filled() {
2697 return; }
2699 }
2700
2701 let queue_allowed_raw = if self.config.queue_position {
2702 match self.determine_trade_fill_qty(&order) {
2703 None => return,
2704 Some(0) => return,
2705 Some(allowed) => Some(allowed),
2706 }
2707 } else {
2708 None
2709 };
2710
2711 let venue_position_id = self.ids_generator.get_position_id(&order, None);
2712 let position = if let Some(venue_position_id) = venue_position_id {
2713 let cache = self.cache.as_ref().borrow();
2714 cache.position(&venue_position_id).cloned()
2715 } else {
2716 None
2717 };
2718
2719 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
2720 log::warn!(
2721 "Canceling REDUCE_ONLY {} as would increase position",
2722 order.order_type()
2723 );
2724 self.cancel_order(&order, None);
2725 return;
2726 }
2727
2728 let tc_before = self.trade_consumption;
2729 let mut fills = self.determine_limit_fill_model_price_and_volume(&order);
2730
2731 if let Some(allowed_raw) = queue_allowed_raw {
2732 let size_prec = self.instrument.size_precision();
2733 let mut remaining = allowed_raw;
2734 fills = fills
2735 .into_iter()
2736 .filter_map(|(price, qty)| {
2737 if remaining == 0 {
2738 return None;
2739 }
2740 let capped = qty.raw.min(remaining);
2741 remaining -= capped;
2742 Some((price, Quantity::from_raw(capped, size_prec)))
2743 })
2744 .collect();
2745
2746 let consumed: QuantityRaw = fills.iter().map(|(_, qty)| qty.raw).sum();
2748 if let Some(excess) = self.queue_excess.get_mut(&order.client_order_id()) {
2749 *excess = excess.saturating_sub(consumed);
2750 }
2751 self.trade_consumption = tc_before + consumed;
2752 }
2753
2754 if fills.is_empty() && self.config.liquidity_consumption {
2758 log::debug!(
2759 "Skipping fill for {}: no liquidity available after consumption",
2760 order.client_order_id()
2761 );
2762
2763 if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
2764 self.cancel_order(&order, None);
2765 }
2766
2767 return;
2768 }
2769
2770 let liquidity_side = order.liquidity_side().unwrap();
2771 self.apply_fills(
2772 &mut order,
2773 fills,
2774 liquidity_side,
2775 venue_position_id,
2776 position,
2777 );
2778 }
2779 None => panic!("Limit order must have a price"),
2780 }
2781 }
2782
2783 fn apply_fills(
2784 &mut self,
2785 order: &mut OrderAny,
2786 fills: Vec<(Price, Quantity)>,
2787 liquidity_side: LiquiditySide,
2788 venue_position_id: Option<PositionId>,
2789 position: Option<Position>,
2790 ) {
2791 if order.time_in_force() == TimeInForce::Fok {
2792 let mut total_size = Quantity::zero(order.quantity().precision);
2793 for (fill_px, fill_qty) in &fills {
2794 total_size = total_size.add(*fill_qty);
2795 }
2796
2797 if order.leaves_qty() > total_size {
2798 self.cancel_order(order, None);
2799 return;
2800 }
2801 }
2802
2803 if fills.is_empty() {
2804 if order.status() == OrderStatus::Submitted {
2805 self.generate_order_rejected(
2806 order,
2807 format!("No market for {}", order.instrument_id()).into(),
2808 );
2809 } else {
2810 log::error!(
2811 "Cannot fill order: no fills from book when fills were expected (check size in data)"
2812 );
2813 return;
2814 }
2815 }
2816
2817 let venue_position_id = if self.oms_type == OmsType::Netting {
2819 None
2820 } else {
2821 venue_position_id
2822 };
2823
2824 let mut initial_market_to_limit_fill = false;
2825
2826 for &(mut fill_px, ref fill_qty) in &fills {
2827 assert!(
2828 (fill_px.precision == self.instrument.price_precision()),
2829 "Invalid price precision for fill price {} when instrument price precision is {}.\
2830 Check that the data price precision matches the {} instrument",
2831 fill_px.precision,
2832 self.instrument.price_precision(),
2833 self.instrument.id()
2834 );
2835
2836 assert!(
2837 (fill_qty.precision == self.instrument.size_precision()),
2838 "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
2839 Check that the data quantity precision matches the {} instrument",
2840 fill_qty.precision,
2841 self.instrument.size_precision(),
2842 self.instrument.id()
2843 );
2844
2845 if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
2846 && order.order_type() == OrderType::MarketToLimit
2847 {
2848 self.generate_order_updated(order, order.quantity(), Some(fill_px), None, None);
2849 initial_market_to_limit_fill = true;
2850 }
2851
2852 if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
2853 fill_px = match order.order_side().as_specified() {
2854 OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
2855 OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
2856 }
2857 }
2858
2859 let mut effective_fill_qty = *fill_qty;
2863
2864 if self.config.use_reduce_only
2865 && order.is_reduce_only()
2866 && let Some(position) = &position
2867 && *fill_qty > position.quantity
2868 {
2869 if position.quantity == Quantity::zero(position.quantity.precision) {
2870 return;
2872 }
2873
2874 let adjusted_fill_qty =
2876 Quantity::from_raw(position.quantity.raw, fill_qty.precision);
2877
2878 effective_fill_qty = min(effective_fill_qty, adjusted_fill_qty);
2880
2881 if order.quantity() != adjusted_fill_qty {
2883 self.generate_order_updated(order, adjusted_fill_qty, None, None, None);
2884 }
2885 }
2886
2887 if fill_qty.is_zero() {
2888 if fills.len() == 1 && order.status() == OrderStatus::Submitted {
2889 self.generate_order_rejected(
2890 order,
2891 format!("No market for {}", order.instrument_id()).into(),
2892 );
2893 }
2894 return;
2895 }
2896
2897 self.fill_order(
2898 order,
2899 fill_px,
2900 effective_fill_qty,
2901 liquidity_side,
2902 venue_position_id,
2903 position.clone(),
2904 );
2905
2906 if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
2907 return;
2909 }
2910 }
2911
2912 if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
2913 self.cancel_order(order, None);
2915 return;
2916 }
2917
2918 if order.is_open()
2919 && self.book_type == BookType::L1_MBP
2920 && matches!(
2921 order.order_type(),
2922 OrderType::Market
2923 | OrderType::MarketIfTouched
2924 | OrderType::StopMarket
2925 | OrderType::TrailingStopMarket
2926 )
2927 {
2928 todo!("Exhausted simulated book volume")
2932 }
2933 }
2934
2935 fn fill_order(
2936 &mut self,
2937 order: &mut OrderAny,
2938 last_px: Price,
2939 last_qty: Quantity,
2940 liquidity_side: LiquiditySide,
2941 venue_position_id: Option<PositionId>,
2942 position: Option<Position>,
2943 ) {
2944 self.check_size_precision(last_qty.precision, "fill quantity")
2945 .unwrap();
2946
2947 match self.cached_filled_qty.get(&order.client_order_id()) {
2948 Some(filled_qty) => {
2949 let leaves_qty = order.quantity().saturating_sub(*filled_qty);
2951 let last_qty = min(last_qty, leaves_qty);
2952 let new_filled_qty = *filled_qty + last_qty;
2953 self.cached_filled_qty
2955 .insert(order.client_order_id(), new_filled_qty);
2956 }
2957 None => {
2958 self.cached_filled_qty
2959 .insert(order.client_order_id(), last_qty);
2960 }
2961 }
2962
2963 let commission = self
2965 .fee_model
2966 .get_commission(order, last_qty, last_px, &self.instrument)
2967 .unwrap();
2968
2969 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2970 self.generate_order_filled(
2971 order,
2972 venue_order_id,
2973 venue_position_id,
2974 last_qty,
2975 last_px,
2976 self.instrument.quote_currency(),
2977 commission,
2978 liquidity_side,
2979 );
2980
2981 if order.is_passive() && order.is_closed() {
2982 if self.core.order_exists(order.client_order_id()) {
2984 let _ = self.core.delete_order(order.client_order_id());
2985 }
2986 self.cached_filled_qty.remove(&order.client_order_id());
2987 }
2988
2989 if !self.config.support_contingent_orders {
2990 return;
2991 }
2992
2993 if let Some(contingency_type) = order.contingency_type() {
2994 match contingency_type {
2995 ContingencyType::Oto => {
2996 if let Some(linked_orders_ids) = order.linked_order_ids() {
2997 for client_order_id in linked_orders_ids {
2998 let mut child_order = match self.cache.borrow().order(client_order_id) {
2999 Some(child_order) => child_order.clone(),
3000 None => panic!("Order {client_order_id} not found in cache"),
3001 };
3002
3003 if child_order.is_closed() || child_order.is_active_local() {
3004 continue;
3005 }
3006
3007 if let (None, Some(position_id)) =
3009 (child_order.position_id(), order.position_id())
3010 {
3011 self.cache
3012 .borrow_mut()
3013 .add_position_id(
3014 &position_id,
3015 &self.venue,
3016 client_order_id,
3017 &child_order.strategy_id(),
3018 )
3019 .unwrap();
3020 log::debug!(
3021 "Added position id {position_id} to cache for order {client_order_id}"
3022 );
3023 }
3024
3025 if (!child_order.is_open())
3026 || (matches!(child_order.status(), OrderStatus::PendingUpdate)
3027 && child_order
3028 .previous_status()
3029 .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
3030 {
3031 let account_id = order.account_id().unwrap_or_else(|| {
3032 *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
3033 panic!(
3034 "Account ID not found for trader {}",
3035 order.trader_id()
3036 )
3037 })
3038 });
3039 self.process_order(&mut child_order, account_id);
3040 }
3041 }
3042 } else {
3043 log::error!(
3044 "OTO order {} does not have linked orders",
3045 order.client_order_id()
3046 );
3047 }
3048 }
3049 ContingencyType::Oco => {
3050 if let Some(linked_orders_ids) = order.linked_order_ids() {
3051 for client_order_id in linked_orders_ids {
3052 let child_order = match self.cache.borrow().order(client_order_id) {
3053 Some(child_order) => child_order.clone(),
3054 None => panic!("Order {client_order_id} not found in cache"),
3055 };
3056
3057 if child_order.is_closed() || child_order.is_active_local() {
3058 continue;
3059 }
3060
3061 self.cancel_order(&child_order, None);
3062 }
3063 } else {
3064 log::error!(
3065 "OCO order {} does not have linked orders",
3066 order.client_order_id()
3067 );
3068 }
3069 }
3070 ContingencyType::Ouo => {
3071 if let Some(linked_orders_ids) = order.linked_order_ids() {
3072 for client_order_id in linked_orders_ids {
3073 let mut child_order = match self.cache.borrow().order(client_order_id) {
3074 Some(child_order) => child_order.clone(),
3075 None => panic!("Order {client_order_id} not found in cache"),
3076 };
3077
3078 if child_order.is_active_local() {
3079 continue;
3080 }
3081
3082 if order.is_closed() && child_order.is_open() {
3083 self.cancel_order(&child_order, None);
3084 } else if !order.leaves_qty().is_zero()
3085 && order.leaves_qty() != child_order.leaves_qty()
3086 {
3087 let price = child_order.price();
3088 let trigger_price = child_order.trigger_price();
3089 self.update_order(
3090 &mut child_order,
3091 Some(order.leaves_qty()),
3092 price,
3093 trigger_price,
3094 Some(false),
3095 );
3096 }
3097 }
3098 } else {
3099 log::error!(
3100 "OUO order {} does not have linked orders",
3101 order.client_order_id()
3102 );
3103 }
3104 }
3105 _ => {}
3106 }
3107 }
3108 }
3109
3110 fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
3111 if self
3112 .core
3113 .is_limit_matched(order.order_side_specified(), price)
3114 {
3115 if order.is_post_only() {
3116 self.generate_order_modify_rejected(
3117 order.trader_id(),
3118 order.strategy_id(),
3119 order.instrument_id(),
3120 order.client_order_id(),
3121 Ustr::from(format!(
3122 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3123 order.order_type(),
3124 order.order_side(),
3125 price,
3126 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3127 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3128 ).as_str()),
3129 order.venue_order_id(),
3130 order.account_id(),
3131 );
3132 return;
3133 }
3134
3135 self.generate_order_updated(order, quantity, Some(price), None, None);
3136
3137 let client_order_id = order.client_order_id();
3139 if let Some(cached) = self.cache.borrow_mut().mut_order(&client_order_id) {
3140 cached.set_liquidity_side(LiquiditySide::Taker);
3141 }
3142 self.fill_limit_order(client_order_id);
3143 return;
3144 }
3145 self.generate_order_updated(order, quantity, Some(price), None, None);
3146 }
3147
3148 fn update_stop_market_order(
3149 &mut self,
3150 order: &mut OrderAny,
3151 quantity: Quantity,
3152 trigger_price: Price,
3153 ) {
3154 if self
3155 .core
3156 .is_stop_matched(order.order_side_specified(), trigger_price)
3157 {
3158 self.generate_order_modify_rejected(
3159 order.trader_id(),
3160 order.strategy_id(),
3161 order.instrument_id(),
3162 order.client_order_id(),
3163 Ustr::from(
3164 format!(
3165 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
3166 order.order_type(),
3167 order.order_side(),
3168 trigger_price,
3169 self.core
3170 .bid
3171 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3172 self.core
3173 .ask
3174 .map_or_else(|| "None".to_string(), |p| p.to_string())
3175 )
3176 .as_str(),
3177 ),
3178 order.venue_order_id(),
3179 order.account_id(),
3180 );
3181 return;
3182 }
3183
3184 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
3185 }
3186
3187 fn update_stop_limit_order(
3188 &mut self,
3189 order: &mut OrderAny,
3190 quantity: Quantity,
3191 price: Price,
3192 trigger_price: Price,
3193 ) {
3194 if order.is_triggered().is_some_and(|t| t) {
3195 if self
3197 .core
3198 .is_limit_matched(order.order_side_specified(), price)
3199 {
3200 if order.is_post_only() {
3201 self.generate_order_modify_rejected(
3202 order.trader_id(),
3203 order.strategy_id(),
3204 order.instrument_id(),
3205 order.client_order_id(),
3206 Ustr::from(format!(
3207 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3208 order.order_type(),
3209 order.order_side(),
3210 price,
3211 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3212 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3213 ).as_str()),
3214 order.venue_order_id(),
3215 order.account_id(),
3216 );
3217 return;
3218 }
3219 self.generate_order_updated(order, quantity, Some(price), None, None);
3220 order.set_liquidity_side(LiquiditySide::Taker);
3221 if let Err(e) = self
3222 .cache
3223 .borrow_mut()
3224 .add_order(order.clone(), None, None, false)
3225 {
3226 log::debug!("Order already in cache: {e}");
3227 }
3228 self.fill_limit_order(order.client_order_id());
3229 return; }
3231 } else {
3232 if self
3234 .core
3235 .is_stop_matched(order.order_side_specified(), trigger_price)
3236 {
3237 self.generate_order_modify_rejected(
3238 order.trader_id(),
3239 order.strategy_id(),
3240 order.instrument_id(),
3241 order.client_order_id(),
3242 Ustr::from(
3243 format!(
3244 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
3245 order.order_type(),
3246 order.order_side(),
3247 trigger_price,
3248 self.core
3249 .bid
3250 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3251 self.core
3252 .ask
3253 .map_or_else(|| "None".to_string(), |p| p.to_string())
3254 )
3255 .as_str(),
3256 ),
3257 order.venue_order_id(),
3258 order.account_id(),
3259 );
3260 return;
3261 }
3262 }
3263
3264 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
3265 }
3266
3267 fn update_market_if_touched_order(
3268 &mut self,
3269 order: &mut OrderAny,
3270 quantity: Quantity,
3271 trigger_price: Price,
3272 ) {
3273 if self
3274 .core
3275 .is_touch_triggered(order.order_side_specified(), trigger_price)
3276 {
3277 self.generate_order_modify_rejected(
3278 order.trader_id(),
3279 order.strategy_id(),
3280 order.instrument_id(),
3281 order.client_order_id(),
3282 Ustr::from(
3283 format!(
3284 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
3285 order.order_type(),
3286 order.order_side(),
3287 trigger_price,
3288 self.core
3289 .bid
3290 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3291 self.core
3292 .ask
3293 .map_or_else(|| "None".to_string(), |p| p.to_string())
3294 )
3295 .as_str(),
3296 ),
3297 order.venue_order_id(),
3298 order.account_id(),
3299 );
3300 return;
3302 }
3303
3304 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
3305 }
3306
3307 fn update_limit_if_touched_order(
3308 &mut self,
3309 order: &mut OrderAny,
3310 quantity: Quantity,
3311 price: Price,
3312 trigger_price: Price,
3313 ) {
3314 if order.is_triggered().is_some_and(|t| t) {
3315 if self
3317 .core
3318 .is_limit_matched(order.order_side_specified(), price)
3319 {
3320 if order.is_post_only() {
3321 self.generate_order_modify_rejected(
3322 order.trader_id(),
3323 order.strategy_id(),
3324 order.instrument_id(),
3325 order.client_order_id(),
3326 Ustr::from(format!(
3327 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3328 order.order_type(),
3329 order.order_side(),
3330 price,
3331 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3332 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3333 ).as_str()),
3334 order.venue_order_id(),
3335 order.account_id(),
3336 );
3337 return;
3339 }
3340 self.generate_order_updated(order, quantity, Some(price), None, None);
3341 order.set_liquidity_side(LiquiditySide::Taker);
3342 self.fill_limit_order(order.client_order_id());
3343 return;
3344 }
3345 } else {
3346 if self
3348 .core
3349 .is_touch_triggered(order.order_side_specified(), trigger_price)
3350 {
3351 self.generate_order_modify_rejected(
3352 order.trader_id(),
3353 order.strategy_id(),
3354 order.instrument_id(),
3355 order.client_order_id(),
3356 Ustr::from(
3357 format!(
3358 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
3359 order.order_type(),
3360 order.order_side(),
3361 trigger_price,
3362 self.core
3363 .bid
3364 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3365 self.core
3366 .ask
3367 .map_or_else(|| "None".to_string(), |p| p.to_string())
3368 )
3369 .as_str(),
3370 ),
3371 order.venue_order_id(),
3372 order.account_id(),
3373 );
3374 return;
3375 }
3376 }
3377
3378 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
3379 }
3380
3381 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
3382 let (new_trigger_price, new_price) = trailing_stop_calculate(
3383 self.instrument.price_increment(),
3384 order.trigger_price(),
3385 order.activation_price(),
3386 order,
3387 self.core.bid,
3388 self.core.ask,
3389 self.core.last,
3390 )
3391 .unwrap();
3392
3393 if new_trigger_price.is_none() && new_price.is_none() {
3394 return;
3395 }
3396
3397 self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price, None);
3398 }
3399
3400 fn accept_order(&mut self, order: &mut OrderAny) {
3403 if order.is_closed() {
3404 return;
3406 }
3407 if order.status() != OrderStatus::Accepted {
3408 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3409 self.generate_order_accepted(order, venue_order_id);
3410
3411 if matches!(
3412 order.order_type(),
3413 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
3414 ) && order.trigger_price().is_none()
3415 {
3416 self.update_trailing_stop_order(order);
3417 }
3418 }
3419
3420 let match_info = OrderMatchInfo::new(
3421 order.client_order_id(),
3422 order.order_side().as_specified(),
3423 order.order_type(),
3424 order.trigger_price(),
3425 order.price(),
3426 match order {
3427 OrderAny::TrailingStopMarket(o) => o.is_activated,
3428 OrderAny::TrailingStopLimit(o) => o.is_activated,
3429 _ => true,
3430 },
3431 );
3432 self.core.add_order(match_info);
3433 }
3434
3435 fn expire_order(&mut self, order: &OrderAny) {
3436 if self.config.support_contingent_orders
3437 && order
3438 .contingency_type()
3439 .is_some_and(|c| c != ContingencyType::NoContingency)
3440 {
3441 self.cancel_contingent_orders(order);
3442 }
3443
3444 self.generate_order_expired(order);
3445 }
3446
3447 fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
3448 let cancel_contingencies = cancel_contingencies.unwrap_or(true);
3449 if order.is_active_local() {
3450 log::error!(
3451 "Cannot cancel an order with {} from the matching engine",
3452 order.status()
3453 );
3454 return;
3455 }
3456
3457 if self.core.order_exists(order.client_order_id()) {
3459 let _ = self.core.delete_order(order.client_order_id());
3460 }
3461 self.cached_filled_qty.remove(&order.client_order_id());
3462
3463 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3464 self.generate_order_canceled(order, venue_order_id);
3465
3466 if self.config.support_contingent_orders
3467 && order.contingency_type().is_some()
3468 && order.contingency_type().unwrap() != ContingencyType::NoContingency
3469 && cancel_contingencies
3470 {
3471 self.cancel_contingent_orders(order);
3472 }
3473 }
3474
3475 fn update_order(
3476 &mut self,
3477 order: &mut OrderAny,
3478 quantity: Option<Quantity>,
3479 price: Option<Price>,
3480 trigger_price: Option<Price>,
3481 update_contingencies: Option<bool>,
3482 ) -> bool {
3483 let update_contingencies = update_contingencies.unwrap_or(true);
3484 let quantity = quantity.unwrap_or(order.quantity());
3485
3486 let price_prec = self.instrument.price_precision();
3487 let size_prec = self.instrument.size_precision();
3488 let instrument_id = self.instrument.id();
3489 if quantity.precision != size_prec {
3490 self.generate_order_modify_rejected(
3491 order.trader_id(),
3492 order.strategy_id(),
3493 order.instrument_id(),
3494 order.client_order_id(),
3495 Ustr::from(&format!(
3496 "Invalid update quantity precision {}, expected {size_prec} for {instrument_id}",
3497 quantity.precision
3498 )),
3499 order.venue_order_id(),
3500 order.account_id(),
3501 );
3502 return false;
3503 }
3504 if let Some(px) = price
3505 && px.precision != price_prec
3506 {
3507 self.generate_order_modify_rejected(
3508 order.trader_id(),
3509 order.strategy_id(),
3510 order.instrument_id(),
3511 order.client_order_id(),
3512 Ustr::from(&format!(
3513 "Invalid update price precision {}, expected {price_prec} for {instrument_id}",
3514 px.precision
3515 )),
3516 order.venue_order_id(),
3517 order.account_id(),
3518 );
3519 return false;
3520 }
3521 if let Some(tp) = trigger_price
3522 && tp.precision != price_prec
3523 {
3524 self.generate_order_modify_rejected(
3525 order.trader_id(),
3526 order.strategy_id(),
3527 order.instrument_id(),
3528 order.client_order_id(),
3529 Ustr::from(&format!(
3530 "Invalid update trigger_price precision {}, expected {price_prec} for {instrument_id}",
3531 tp.precision
3532 )),
3533 order.venue_order_id(),
3534 order.account_id(),
3535 );
3536 return false;
3537 }
3538
3539 let filled_qty = self
3541 .cached_filled_qty
3542 .get(&order.client_order_id())
3543 .copied()
3544 .unwrap_or(order.filled_qty());
3545 if quantity < filled_qty {
3546 self.generate_order_modify_rejected(
3547 order.trader_id(),
3548 order.strategy_id(),
3549 order.instrument_id(),
3550 order.client_order_id(),
3551 Ustr::from(&format!(
3552 "Cannot reduce order quantity {quantity} below filled quantity {filled_qty}",
3553 )),
3554 order.venue_order_id(),
3555 order.account_id(),
3556 );
3557 return false;
3558 }
3559
3560 match order {
3561 OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
3562 let price = price.unwrap_or(order.price().unwrap());
3563 self.update_limit_order(order, quantity, price);
3564 }
3565 OrderAny::StopMarket(_) => {
3566 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3567 self.update_stop_market_order(order, quantity, trigger_price);
3568 }
3569 OrderAny::StopLimit(_) => {
3570 let price = price.unwrap_or(order.price().unwrap());
3571 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3572 self.update_stop_limit_order(order, quantity, price, trigger_price);
3573 }
3574 OrderAny::MarketIfTouched(_) => {
3575 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3576 self.update_market_if_touched_order(order, quantity, trigger_price);
3577 }
3578 OrderAny::LimitIfTouched(_) => {
3579 let price = price.unwrap_or(order.price().unwrap());
3580 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3581 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
3582 }
3583 OrderAny::TrailingStopMarket(_) => {
3584 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3585 self.update_market_if_touched_order(order, quantity, trigger_price);
3586 }
3587 OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
3588 let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
3589 let trigger_price =
3590 trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
3591 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
3592 }
3593 _ => {
3594 panic!(
3595 "Unsupported order type {} for update_order",
3596 order.order_type()
3597 );
3598 }
3599 }
3600
3601 let new_leaves_qty = quantity.saturating_sub(filled_qty);
3603 if new_leaves_qty.is_zero() {
3604 if self.config.support_contingent_orders
3605 && order
3606 .contingency_type()
3607 .is_some_and(|c| c != ContingencyType::NoContingency)
3608 && update_contingencies
3609 {
3610 self.update_contingent_order(order);
3611 }
3612 self.cancel_order(order, Some(false));
3614 return true;
3615 }
3616
3617 if self.config.support_contingent_orders
3618 && order
3619 .contingency_type()
3620 .is_some_and(|c| c != ContingencyType::NoContingency)
3621 && update_contingencies
3622 {
3623 self.update_contingent_order(order);
3624 }
3625
3626 true
3627 }
3628
3629 pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
3631 let order = match self.cache.borrow().order(&client_order_id).cloned() {
3632 Some(order) => order,
3633 None => {
3634 log::error!(
3635 "Cannot trigger stop order: order {client_order_id} not found in cache"
3636 );
3637 return;
3638 }
3639 };
3640
3641 match order.order_type() {
3642 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
3643 self.fill_limit_order(client_order_id);
3644 }
3645 OrderType::StopMarket | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
3646 self.fill_market_order(client_order_id);
3647 }
3648 _ => {
3649 log::error!(
3650 "Cannot trigger stop order: invalid order type {}",
3651 order.order_type()
3652 );
3653 }
3654 }
3655 }
3656
3657 fn update_contingent_order(&mut self, order: &OrderAny) {
3658 log::debug!("Updating OUO orders from {}", order.client_order_id());
3659 if let Some(linked_order_ids) = order.linked_order_ids() {
3660 let parent_filled_qty = self
3661 .cached_filled_qty
3662 .get(&order.client_order_id())
3663 .copied()
3664 .unwrap_or(order.filled_qty());
3665 let parent_leaves_qty = order.quantity().saturating_sub(parent_filled_qty);
3666
3667 for client_order_id in linked_order_ids {
3668 let mut child_order = match self.cache.borrow().order(client_order_id) {
3669 Some(order) => order.clone(),
3670 None => panic!("Order {client_order_id} not found in cache."),
3671 };
3672
3673 if child_order.is_active_local() {
3674 continue;
3675 }
3676
3677 let child_filled_qty = self
3678 .cached_filled_qty
3679 .get(&child_order.client_order_id())
3680 .copied()
3681 .unwrap_or(child_order.filled_qty());
3682
3683 if parent_leaves_qty.is_zero() {
3684 self.cancel_order(&child_order, Some(false));
3685 } else if child_filled_qty >= parent_leaves_qty {
3686 self.cancel_order(&child_order, Some(false));
3688 } else {
3689 let child_leaves_qty = child_order.quantity().saturating_sub(child_filled_qty);
3690 if child_leaves_qty != parent_leaves_qty {
3691 let price = child_order.price();
3692 let trigger_price = child_order.trigger_price();
3693 self.update_order(
3694 &mut child_order,
3695 Some(parent_leaves_qty),
3696 price,
3697 trigger_price,
3698 Some(false),
3699 );
3700 }
3701 }
3702 }
3703 }
3704 }
3705
3706 fn cancel_contingent_orders(&mut self, order: &OrderAny) {
3707 if let Some(linked_order_ids) = order.linked_order_ids() {
3708 for client_order_id in linked_order_ids {
3709 let contingent_order = match self.cache.borrow().order(client_order_id) {
3710 Some(order) => order.clone(),
3711 None => panic!("Cannot find contingent order for {client_order_id}"),
3712 };
3713 if contingent_order.is_active_local() {
3714 continue;
3716 }
3717 if !contingent_order.is_closed() {
3718 self.cancel_order(&contingent_order, Some(false));
3719 }
3720 }
3721 }
3722 }
3723
3724 fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
3727 let ts_now = self.clock.borrow().timestamp_ns();
3728 let account_id = order
3729 .account_id()
3730 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
3731
3732 let due_post_only = reason.as_str().starts_with("POST_ONLY");
3734
3735 let event = OrderEventAny::Rejected(OrderRejected::new(
3736 order.trader_id(),
3737 order.strategy_id(),
3738 order.instrument_id(),
3739 order.client_order_id(),
3740 account_id,
3741 reason,
3742 UUID4::new(),
3743 ts_now,
3744 ts_now,
3745 false,
3746 due_post_only,
3747 ));
3748 let endpoint = MessagingSwitchboard::exec_engine_process();
3749 msgbus::send_order_event(endpoint, event);
3750 }
3751
3752 fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
3753 let ts_now = self.clock.borrow().timestamp_ns();
3754 let account_id = order
3755 .account_id()
3756 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
3757 let event = OrderEventAny::Accepted(OrderAccepted::new(
3758 order.trader_id(),
3759 order.strategy_id(),
3760 order.instrument_id(),
3761 order.client_order_id(),
3762 venue_order_id,
3763 account_id,
3764 UUID4::new(),
3765 ts_now,
3766 ts_now,
3767 false,
3768 ));
3769
3770 let endpoint = MessagingSwitchboard::exec_engine_process();
3771 msgbus::send_order_event(endpoint, event);
3772 }
3773
3774 #[allow(clippy::too_many_arguments)]
3775 fn generate_order_modify_rejected(
3776 &self,
3777 trader_id: TraderId,
3778 strategy_id: StrategyId,
3779 instrument_id: InstrumentId,
3780 client_order_id: ClientOrderId,
3781 reason: Ustr,
3782 venue_order_id: Option<VenueOrderId>,
3783 account_id: Option<AccountId>,
3784 ) {
3785 let ts_now = self.clock.borrow().timestamp_ns();
3786 let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
3787 trader_id,
3788 strategy_id,
3789 instrument_id,
3790 client_order_id,
3791 reason,
3792 UUID4::new(),
3793 ts_now,
3794 ts_now,
3795 false,
3796 venue_order_id,
3797 account_id,
3798 ));
3799 let endpoint = MessagingSwitchboard::exec_engine_process();
3800 msgbus::send_order_event(endpoint, event);
3801 }
3802
3803 #[allow(clippy::too_many_arguments)]
3804 fn generate_order_cancel_rejected(
3805 &self,
3806 trader_id: TraderId,
3807 strategy_id: StrategyId,
3808 account_id: AccountId,
3809 instrument_id: InstrumentId,
3810 client_order_id: ClientOrderId,
3811 venue_order_id: Option<VenueOrderId>,
3812 reason: Ustr,
3813 ) {
3814 let ts_now = self.clock.borrow().timestamp_ns();
3815 let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
3816 trader_id,
3817 strategy_id,
3818 instrument_id,
3819 client_order_id,
3820 reason,
3821 UUID4::new(),
3822 ts_now,
3823 ts_now,
3824 false,
3825 venue_order_id,
3826 Some(account_id),
3827 ));
3828 let endpoint = MessagingSwitchboard::exec_engine_process();
3829 msgbus::send_order_event(endpoint, event);
3830 }
3831
3832 fn generate_order_updated(
3833 &self,
3834 order: &mut OrderAny,
3835 quantity: Quantity,
3836 price: Option<Price>,
3837 trigger_price: Option<Price>,
3838 protection_price: Option<Price>,
3839 ) {
3840 let ts_now = self.clock.borrow().timestamp_ns();
3841 let event = OrderEventAny::Updated(OrderUpdated::new(
3842 order.trader_id(),
3843 order.strategy_id(),
3844 order.instrument_id(),
3845 order.client_order_id(),
3846 quantity,
3847 UUID4::new(),
3848 ts_now,
3849 ts_now,
3850 false,
3851 order.venue_order_id(),
3852 order.account_id(),
3853 price,
3854 trigger_price,
3855 protection_price,
3856 ));
3857
3858 let endpoint = MessagingSwitchboard::exec_engine_process();
3859 msgbus::send_order_event(endpoint, event);
3860 }
3861
3862 fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
3863 let ts_now = self.clock.borrow().timestamp_ns();
3864 let event = OrderEventAny::Canceled(OrderCanceled::new(
3865 order.trader_id(),
3866 order.strategy_id(),
3867 order.instrument_id(),
3868 order.client_order_id(),
3869 UUID4::new(),
3870 ts_now,
3871 ts_now,
3872 false,
3873 Some(venue_order_id),
3874 order.account_id(),
3875 ));
3876 let endpoint = MessagingSwitchboard::exec_engine_process();
3877 msgbus::send_order_event(endpoint, event);
3878 }
3879
3880 fn generate_order_triggered(&self, order: &OrderAny) {
3881 let ts_now = self.clock.borrow().timestamp_ns();
3882 let event = OrderEventAny::Triggered(OrderTriggered::new(
3883 order.trader_id(),
3884 order.strategy_id(),
3885 order.instrument_id(),
3886 order.client_order_id(),
3887 UUID4::new(),
3888 ts_now,
3889 ts_now,
3890 false,
3891 order.venue_order_id(),
3892 order.account_id(),
3893 ));
3894 let endpoint = MessagingSwitchboard::exec_engine_process();
3895 msgbus::send_order_event(endpoint, event);
3896 }
3897
3898 fn generate_order_expired(&self, order: &OrderAny) {
3899 let ts_now = self.clock.borrow().timestamp_ns();
3900 let event = OrderEventAny::Expired(OrderExpired::new(
3901 order.trader_id(),
3902 order.strategy_id(),
3903 order.instrument_id(),
3904 order.client_order_id(),
3905 UUID4::new(),
3906 ts_now,
3907 ts_now,
3908 false,
3909 order.venue_order_id(),
3910 order.account_id(),
3911 ));
3912 let endpoint = MessagingSwitchboard::exec_engine_process();
3913 msgbus::send_order_event(endpoint, event);
3914 }
3915
3916 #[allow(clippy::too_many_arguments)]
3917 fn generate_order_filled(
3918 &mut self,
3919 order: &mut OrderAny,
3920 venue_order_id: VenueOrderId,
3921 venue_position_id: Option<PositionId>,
3922 last_qty: Quantity,
3923 last_px: Price,
3924 quote_currency: Currency,
3925 commission: Money,
3926 liquidity_side: LiquiditySide,
3927 ) {
3928 debug_assert!(
3929 last_qty <= order.quantity(),
3930 "Fill quantity {last_qty} exceeds order quantity {order_qty} for {client_order_id}",
3931 order_qty = order.quantity(),
3932 client_order_id = order.client_order_id()
3933 );
3934
3935 let ts_now = self.clock.borrow().timestamp_ns();
3936 let account_id = order
3937 .account_id()
3938 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
3939 let event = OrderEventAny::Filled(OrderFilled::new(
3940 order.trader_id(),
3941 order.strategy_id(),
3942 order.instrument_id(),
3943 order.client_order_id(),
3944 venue_order_id,
3945 account_id,
3946 self.ids_generator.generate_trade_id(),
3947 order.order_side(),
3948 order.order_type(),
3949 last_qty,
3950 last_px,
3951 quote_currency,
3952 liquidity_side,
3953 UUID4::new(),
3954 ts_now,
3955 ts_now,
3956 false,
3957 venue_position_id,
3958 Some(commission),
3959 ));
3960
3961 let endpoint = MessagingSwitchboard::exec_engine_process();
3962 msgbus::send_order_event(endpoint, event);
3963 }
3964}