Skip to main content

nautilus_execution/matching_engine/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    cell::RefCell,
22    cmp::min,
23    fmt::Debug,
24    ops::{Add, Sub},
25    rc::Rc,
26};
27
28use ahash::AHashMap;
29use chrono::TimeDelta;
30use nautilus_common::{
31    cache::Cache,
32    clock::Clock,
33    messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
34    msgbus::{self, MessagingSwitchboard},
35};
36use nautilus_core::{UUID4, UnixNanos};
37use nautilus_model::{
38    data::{
39        Bar, BarType, InstrumentClose, OrderBookDelta, OrderBookDeltas, OrderBookDepth10,
40        QuoteTick, TradeTick, order::BookOrder,
41    },
42    enums::{
43        AccountType, AggregationSource, AggressorSide, BookAction, BookType, ContingencyType,
44        InstrumentCloseType, LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide,
45        OrderSideSpecified, OrderStatus, OrderType, PositionSide, PriceType, TimeInForce,
46        TriggerType,
47    },
48    events::{
49        OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
50        OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
51    },
52    identifiers::{
53        AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
54        VenueOrderId,
55    },
56    instruments::{Instrument, InstrumentAny},
57    orderbook::OrderBook,
58    orders::{MarketOrder, Order, OrderAny, OrderCore},
59    position::Position,
60    types::{
61        Currency, Money, Price, Quantity, fixed::FIXED_PRECISION, price::PriceRaw,
62        quantity::QuantityRaw,
63    },
64};
65use ustr::Ustr;
66
67use crate::{
68    matching_core::{OrderMatchInfo, OrderMatchingCore},
69    matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
70    models::{
71        fee::{FeeModel, FeeModelAny},
72        fill::{FillModel, FillModelAny},
73    },
74    protection::protection_price_calculate,
75    trailing::trailing_stop_calculate,
76};
77
78/// An order matching engine for a single market.
79pub struct OrderMatchingEngine {
80    /// The venue for the matching engine.
81    pub venue: Venue,
82    /// The instrument for the matching engine.
83    pub instrument: InstrumentAny,
84    /// The instruments raw integer ID for the venue.
85    pub raw_id: u32,
86    /// The order book type for the matching engine.
87    pub book_type: BookType,
88    /// The order management system (OMS) type for the matching engine.
89    pub oms_type: OmsType,
90    /// The account type for the matching engine.
91    pub account_type: AccountType,
92    /// The market status for the matching engine.
93    pub market_status: MarketStatus,
94    /// The config for the matching engine.
95    pub config: OrderMatchingEngineConfig,
96    core: OrderMatchingCore,
97    clock: Rc<RefCell<dyn Clock>>,
98    cache: Rc<RefCell<Cache>>,
99    book: OrderBook,
100    fill_model: FillModelAny,
101    fee_model: FeeModelAny,
102    target_bid: Option<Price>,
103    target_ask: Option<Price>,
104    target_last: Option<Price>,
105    last_bar_bid: Option<Bar>,
106    last_bar_ask: Option<Bar>,
107    fill_at_market: bool,
108    execution_bar_types: AHashMap<InstrumentId, BarType>,
109    execution_bar_deltas: AHashMap<BarType, TimeDelta>,
110    account_ids: AHashMap<TraderId, AccountId>,
111    cached_filled_qty: AHashMap<ClientOrderId, Quantity>,
112    ids_generator: IdsGenerator,
113    last_trade_size: Option<Quantity>,
114    bid_consumption: AHashMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
115    ask_consumption: AHashMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
116    trade_consumption: QuantityRaw,
117    // Maps client_order_id -> (price_raw, ahead_qty_raw)
118    queue_ahead: AHashMap<ClientOrderId, (PriceRaw, QuantityRaw)>,
119    queue_excess: AHashMap<ClientOrderId, QuantityRaw>,
120    instrument_close: Option<InstrumentClose>,
121    expiration_processed: bool,
122}
123
124impl Debug for OrderMatchingEngine {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct(stringify!(OrderMatchingEngine))
127            .field("venue", &self.venue)
128            .field("instrument", &self.instrument.id())
129            .finish()
130    }
131}
132
133impl OrderMatchingEngine {
134    /// Creates a new [`OrderMatchingEngine`] instance.
135    #[allow(clippy::too_many_arguments)]
136    pub fn new(
137        instrument: InstrumentAny,
138        raw_id: u32,
139        fill_model: FillModelAny,
140        fee_model: FeeModelAny,
141        book_type: BookType,
142        oms_type: OmsType,
143        account_type: AccountType,
144        clock: Rc<RefCell<dyn Clock>>,
145        cache: Rc<RefCell<Cache>>,
146        config: OrderMatchingEngineConfig,
147    ) -> Self {
148        let book = OrderBook::new(instrument.id(), book_type);
149        let core = OrderMatchingCore::new(
150            instrument.id(),
151            instrument.price_increment(),
152            None, // TBD (will be a function on the engine)
153            None, // TBD (will be a function on the engine)
154            None, // TBD (will be a function on the engine)
155        );
156        let ids_generator = IdsGenerator::new(
157            instrument.id().venue,
158            oms_type,
159            raw_id,
160            config.use_random_ids,
161            config.use_position_ids,
162            cache.clone(),
163        );
164
165        Self {
166            venue: instrument.id().venue,
167            instrument,
168            raw_id,
169            fill_model,
170            fee_model,
171            book_type,
172            oms_type,
173            account_type,
174            clock,
175            cache,
176            book,
177            market_status: MarketStatus::Open,
178            config,
179            core,
180            target_bid: None,
181            target_ask: None,
182            target_last: None,
183            last_bar_bid: None,
184            last_bar_ask: None,
185            fill_at_market: true,
186            execution_bar_types: AHashMap::new(),
187            execution_bar_deltas: AHashMap::new(),
188            account_ids: AHashMap::new(),
189            cached_filled_qty: AHashMap::new(),
190            ids_generator,
191            last_trade_size: None,
192            bid_consumption: AHashMap::new(),
193            ask_consumption: AHashMap::new(),
194            trade_consumption: 0,
195            queue_ahead: AHashMap::new(),
196            queue_excess: AHashMap::new(),
197            instrument_close: None,
198            expiration_processed: false,
199        }
200    }
201
202    /// Resets the matching engine to its initial state.
203    ///
204    /// Clears the order book, execution state, cached data, and resets all
205    /// internal components. This is typically used for backtesting scenarios
206    /// where the engine needs to be reset between test runs.
207    pub fn reset(&mut self) {
208        self.book.reset();
209        self.execution_bar_types.clear();
210        self.execution_bar_deltas.clear();
211        self.account_ids.clear();
212        self.cached_filled_qty.clear();
213        self.core.reset();
214        self.target_bid = None;
215        self.target_ask = None;
216        self.target_last = None;
217        self.last_trade_size = None;
218        self.bid_consumption.clear();
219        self.ask_consumption.clear();
220        self.trade_consumption = 0;
221        self.queue_ahead.clear();
222        self.queue_excess.clear();
223        self.instrument_close = None;
224        self.expiration_processed = false;
225        self.fill_at_market = true;
226        self.ids_generator.reset();
227
228        log::info!("Reset {}", self.instrument.id());
229    }
230
231    fn apply_liquidity_consumption(
232        &mut self,
233        fills: Vec<(Price, Quantity)>,
234        order_side: OrderSide,
235        leaves_qty: Quantity,
236        book_prices: Option<&[Price]>,
237    ) -> Vec<(Price, Quantity)> {
238        if !self.config.liquidity_consumption {
239            return fills;
240        }
241
242        let consumption = match order_side {
243            OrderSide::Buy => &mut self.ask_consumption,
244            OrderSide::Sell => &mut self.bid_consumption,
245            _ => return fills,
246        };
247
248        let mut adjusted_fills = Vec::with_capacity(fills.len());
249        let mut remaining_qty = leaves_qty.raw;
250
251        for (fill_idx, (price, qty)) in fills.into_iter().enumerate() {
252            if remaining_qty == 0 {
253                break;
254            }
255
256            // Use book_price for consumption tracking (original price before MAKER adjustment),
257            // but use price (potentially adjusted) for the output fill.
258            let book_price = book_prices
259                .and_then(|bp| bp.get(fill_idx).copied())
260                .unwrap_or(price);
261
262            let book_price_raw = book_price.raw;
263            let level_size = self
264                .book
265                .get_quantity_at_level(book_price, order_side, qty.precision);
266
267            let (original_size, consumed) = consumption
268                .entry(book_price_raw)
269                .or_insert((level_size.raw, 0));
270
271            // Reset consumption when book size changes (fresh data)
272            if *original_size != level_size.raw {
273                *original_size = level_size.raw;
274                *consumed = 0;
275            }
276
277            let available = original_size.saturating_sub(*consumed);
278            if available == 0 {
279                continue;
280            }
281
282            let adjusted_qty_raw = min(min(qty.raw, available), remaining_qty);
283            if adjusted_qty_raw == 0 {
284                continue;
285            }
286
287            *consumed += adjusted_qty_raw;
288            remaining_qty -= adjusted_qty_raw;
289
290            let adjusted_qty = Quantity::from_raw(adjusted_qty_raw, qty.precision);
291            adjusted_fills.push((price, adjusted_qty));
292        }
293
294        adjusted_fills
295    }
296
297    /// Sets the fill model for the matching engine.
298    pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
299        self.fill_model = fill_model;
300    }
301
302    fn snapshot_queue_position(&mut self, order: &OrderAny, price: Price) {
303        if !self.config.queue_position {
304            return;
305        }
306        let size_prec = self.instrument.size_precision();
307
308        // Pass opposite side because get_quantity_at_level flips internally
309        // (BUY reads asks, SELL reads bids). We want the resting side depth.
310        let qty_ahead = self.book.get_quantity_at_level(
311            price,
312            OrderCore::opposite_side(order.order_side()),
313            size_prec,
314        );
315        self.queue_ahead
316            .insert(order.client_order_id(), (price.raw, qty_ahead.raw));
317    }
318
319    fn decrement_queue_on_trade(
320        &mut self,
321        price_raw: PriceRaw,
322        trade_size_raw: QuantityRaw,
323        aggressor_side: AggressorSide,
324    ) {
325        if !self.config.queue_position {
326            return;
327        }
328
329        self.queue_excess.clear();
330
331        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
332        let mut entries: Vec<(ClientOrderId, QuantityRaw, QuantityRaw)> = Vec::new();
333        let mut stale: Vec<ClientOrderId> = Vec::new();
334
335        for client_order_id in keys {
336            let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
337            {
338                Some(v) => v,
339                None => continue,
340            };
341
342            let cache = self.cache.borrow();
343            let order_info = cache.order(&client_order_id).and_then(|order| {
344                if order.is_closed() {
345                    None
346                } else {
347                    Some((order.order_side(), order.leaves_qty().raw))
348                }
349            });
350            drop(cache);
351
352            let Some((order_side, leaves_raw)) = order_info else {
353                stale.push(client_order_id);
354                continue;
355            };
356
357            if order_price_raw != price_raw || ahead_raw == 0 {
358                continue;
359            }
360
361            let should_decrement = matches!(aggressor_side, AggressorSide::NoAggressor)
362                || (aggressor_side == AggressorSide::Buyer && order_side == OrderSide::Sell)
363                || (aggressor_side == AggressorSide::Seller && order_side == OrderSide::Buy);
364
365            if should_decrement {
366                entries.push((client_order_id, ahead_raw, leaves_raw));
367            }
368        }
369
370        for id in stale {
371            self.queue_ahead.remove(&id);
372        }
373
374        // Sort by queue position (earliest first) for shared budget allocation
375        entries.sort_by_key(|&(_, ahead, _)| ahead);
376
377        let mut remaining = trade_size_raw;
378        let mut prev_position: QuantityRaw = 0;
379
380        for (client_order_id, ahead_raw, leaves_raw) in &entries {
381            if remaining == 0 {
382                let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
383                self.queue_ahead
384                    .insert(*client_order_id, (price_raw, new_ahead));
385                if new_ahead == 0 {
386                    // Queue cleared but no trade volume left for this order
387                    self.queue_excess.insert(*client_order_id, 0);
388                }
389                continue;
390            }
391
392            // Consume the gap between previous position and this order's depth
393            let gap = ahead_raw.saturating_sub(prev_position);
394            let queue_consumed = remaining.min(gap);
395            remaining -= queue_consumed;
396
397            if remaining == 0 && queue_consumed < gap {
398                let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
399                self.queue_ahead
400                    .insert(*client_order_id, (price_raw, new_ahead));
401                continue;
402            }
403
404            self.queue_ahead.insert(*client_order_id, (price_raw, 0));
405            let excess = remaining.min(*leaves_raw);
406            self.queue_excess.insert(*client_order_id, excess);
407            remaining -= excess;
408            prev_position = ahead_raw + excess;
409        }
410    }
411
412    fn determine_trade_fill_qty(&self, order: &OrderAny) -> Option<QuantityRaw> {
413        if !self.config.queue_position {
414            return Some(order.leaves_qty().raw);
415        }
416
417        let client_order_id = order.client_order_id();
418
419        if let Some(&(tracked_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id)
420            && let Some(order_price) = order.price()
421            && order_price.raw == tracked_price_raw
422            && ahead_raw > 0
423        {
424            // Allow fill when a current trade has crossed through the order's price
425            let crossed = self.last_trade_size.is_some()
426                && self
427                    .core
428                    .last
429                    .is_some_and(|trade_price| match order.order_side() {
430                        OrderSide::Buy => trade_price.raw < order_price.raw,
431                        OrderSide::Sell => trade_price.raw > order_price.raw,
432                        _ => false,
433                    });
434            if !crossed {
435                return None;
436            }
437        }
438
439        let leaves_raw = order.leaves_qty().raw;
440        if leaves_raw == 0 {
441            return None;
442        }
443
444        let mut available_raw = leaves_raw;
445
446        // Cap by remaining trade volume and queue excess (only during trade processing)
447        if let Some(trade_size) = self.last_trade_size {
448            let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
449            available_raw = available_raw.min(remaining);
450
451            if let Some(&excess_raw) = self.queue_excess.get(&client_order_id) {
452                if excess_raw == 0 {
453                    return None;
454                }
455                available_raw = available_raw.min(excess_raw);
456            }
457        }
458
459        if available_raw == 0 {
460            return None;
461        }
462
463        Some(available_raw)
464    }
465
466    fn clear_all_queue_positions(&mut self) {
467        for (_, (_, ahead_raw)) in &mut self.queue_ahead {
468            *ahead_raw = 0;
469        }
470    }
471
472    fn clear_queue_on_delete(&mut self, deleted_price_raw: PriceRaw, deleted_side: OrderSide) {
473        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
474        for client_order_id in keys {
475            if let Some(&(order_price_raw, _)) = self.queue_ahead.get(&client_order_id)
476                && order_price_raw == deleted_price_raw
477            {
478                let cache = self.cache.borrow();
479                if let Some(order) = cache.order(&client_order_id)
480                    && order.order_side() == deleted_side
481                {
482                    drop(cache);
483                    self.queue_ahead
484                        .insert(client_order_id, (order_price_raw, 0));
485                }
486            }
487        }
488    }
489
490    fn cap_queue_on_update(
491        &mut self,
492        price_raw: PriceRaw,
493        new_size_raw: QuantityRaw,
494        side: OrderSide,
495    ) {
496        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
497        for client_order_id in keys {
498            if let Some(&(order_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id)
499                && order_price_raw == price_raw
500                && ahead_raw > new_size_raw
501            {
502                let cache = self.cache.borrow();
503                if let Some(order) = cache.order(&client_order_id)
504                    && order.order_side() == side
505                {
506                    drop(cache);
507                    self.queue_ahead
508                        .insert(client_order_id, (order_price_raw, new_size_raw));
509                }
510            }
511        }
512    }
513
514    #[must_use]
515    /// Returns the best bid price from the order book.
516    pub fn best_bid_price(&self) -> Option<Price> {
517        self.book.best_bid_price()
518    }
519
520    #[must_use]
521    /// Returns the best ask price from the order book.
522    pub fn best_ask_price(&self) -> Option<Price> {
523        self.book.best_ask_price()
524    }
525
526    #[must_use]
527    /// Returns a reference to the internal order book.
528    pub const fn get_book(&self) -> &OrderBook {
529        &self.book
530    }
531
532    #[must_use]
533    /// Returns all open bid orders managed by the matching core.
534    pub const fn get_open_bid_orders(&self) -> &[OrderMatchInfo] {
535        self.core.get_orders_bid()
536    }
537
538    #[must_use]
539    /// Returns all open ask orders managed by the matching core.
540    pub const fn get_open_ask_orders(&self) -> &[OrderMatchInfo] {
541        self.core.get_orders_ask()
542    }
543
544    #[must_use]
545    /// Returns all open orders from both bid and ask sides.
546    pub fn get_open_orders(&self) -> Vec<OrderMatchInfo> {
547        let mut orders = Vec::new();
548        orders.extend_from_slice(self.core.get_orders_bid());
549        orders.extend_from_slice(self.core.get_orders_ask());
550        orders
551    }
552
553    #[must_use]
554    /// Returns true if an order with the given client order ID exists in the matching engine.
555    pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
556        self.core.order_exists(client_order_id)
557    }
558
559    #[must_use]
560    pub const fn get_core(&self) -> &OrderMatchingCore {
561        &self.core
562    }
563
564    pub fn get_core_mut(&mut self) -> &mut OrderMatchingCore {
565        &mut self.core
566    }
567
568    pub fn set_fill_at_market(&mut self, value: bool) {
569        self.fill_at_market = value;
570    }
571
572    // -- DATA PROCESSING -------------------------------------------------------------------------
573
574    fn check_price_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
575        let expected = self.instrument.price_precision();
576        if actual != expected {
577            anyhow::bail!(
578                "Invalid {field} precision {actual}, expected {expected} for {}",
579                self.instrument.id()
580            );
581        }
582        Ok(())
583    }
584
585    fn check_size_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
586        let expected = self.instrument.size_precision();
587        if actual != expected {
588            anyhow::bail!(
589                "Invalid {field} precision {actual}, expected {expected} for {}",
590                self.instrument.id()
591            );
592        }
593        Ok(())
594    }
595
596    /// Process the venues market for the given order book delta.
597    ///
598    /// # Errors
599    ///
600    /// - If delta order price precision does not match the instrument (for Add/Update actions).
601    /// - If delta order size precision does not match the instrument (for Add/Update actions).
602    /// - If applying the delta to the book fails.
603    pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) -> anyhow::Result<()> {
604        log::debug!("Processing {delta}");
605
606        // Validate precision for Add and Update actions (Delete/Clear may have NULL_ORDER)
607        if matches!(delta.action, BookAction::Add | BookAction::Update) {
608            self.check_price_precision(delta.order.price.precision, "delta order price")?;
609            self.check_size_precision(delta.order.size.precision, "delta order size")?;
610        }
611
612        if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
613            self.book.apply_delta(delta)?;
614        }
615
616        if self.config.queue_position {
617            if (delta.flags & 32) != 0 || delta.action == BookAction::Clear {
618                self.clear_all_queue_positions();
619            } else if delta.action == BookAction::Delete {
620                self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
621            } else if delta.action == BookAction::Update {
622                self.cap_queue_on_update(
623                    delta.order.price.raw,
624                    delta.order.size.raw,
625                    delta.order.side,
626                );
627            }
628        }
629
630        self.iterate(delta.ts_init, AggressorSide::NoAggressor);
631        Ok(())
632    }
633
634    /// Process the venues market for the given order book deltas.
635    ///
636    /// # Errors
637    ///
638    /// - If any delta order price precision does not match the instrument (for Add/Update actions).
639    /// - If any delta order size precision does not match the instrument (for Add/Update actions).
640    /// - If applying the deltas to the book fails.
641    pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
642        log::debug!("Processing {deltas}");
643
644        // Validate precision for Add and Update actions (Delete/Clear may have NULL_ORDER)
645        for delta in &deltas.deltas {
646            if matches!(delta.action, BookAction::Add | BookAction::Update) {
647                self.check_price_precision(delta.order.price.precision, "delta order price")?;
648                self.check_size_precision(delta.order.size.precision, "delta order size")?;
649            }
650        }
651
652        if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
653            self.book.apply_deltas(deltas)?;
654        }
655
656        if self.config.queue_position {
657            for delta in &deltas.deltas {
658                if (delta.flags & 32) != 0 || delta.action == BookAction::Clear {
659                    self.clear_all_queue_positions();
660                    break;
661                } else if delta.action == BookAction::Delete {
662                    self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
663                } else if delta.action == BookAction::Update {
664                    self.cap_queue_on_update(
665                        delta.order.price.raw,
666                        delta.order.size.raw,
667                        delta.order.side,
668                    );
669                }
670            }
671        }
672
673        self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
674        Ok(())
675    }
676
677    /// Process the venues market for the given order book depth10.
678    ///
679    /// # Errors
680    ///
681    /// - If any bid/ask price precision does not match the instrument.
682    /// - If any bid/ask size precision does not match the instrument.
683    /// - If applying the depth to the book fails.
684    /// - If updating the L1 order book with the top-of-book quote fails.
685    pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) -> anyhow::Result<()> {
686        log::debug!("Processing OrderBookDepth10 for {}", depth.instrument_id);
687
688        // Validate precision for non-padding entries
689        for order in &depth.bids {
690            if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
691                continue;
692            }
693            self.check_price_precision(order.price.precision, "bid price")?;
694            self.check_size_precision(order.size.precision, "bid size")?;
695        }
696        for order in &depth.asks {
697            if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
698                continue;
699            }
700            self.check_price_precision(order.price.precision, "ask price")?;
701            self.check_size_precision(order.size.precision, "ask size")?;
702        }
703
704        // For L1 books, only apply top-of-book to avoid mispricing
705        // against worst-level entries when full depth is applied
706        if self.book_type == BookType::L1_MBP {
707            let quote = QuoteTick::new(
708                depth.instrument_id,
709                depth.bids[0].price,
710                depth.asks[0].price,
711                depth.bids[0].size,
712                depth.asks[0].size,
713                depth.ts_event,
714                depth.ts_init,
715            );
716            self.book.update_quote_tick(&quote)?;
717        } else {
718            self.book.apply_depth(depth)?;
719        }
720
721        // Depth snapshots replace the entire book, clear all queue positions
722        if self.config.queue_position {
723            self.clear_all_queue_positions();
724        }
725
726        self.iterate(depth.ts_init, AggressorSide::NoAggressor);
727        Ok(())
728    }
729
730    /// Processes a quote tick to update the market state.
731    ///
732    /// # Panics
733    ///
734    /// - If updating the order book with the quote tick fails.
735    /// - If bid/ask price precision does not match the instrument.
736    /// - If bid/ask size precision does not match the instrument.
737    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
738        log::debug!("Processing {quote}");
739
740        self.check_price_precision(quote.bid_price.precision, "bid_price")
741            .unwrap();
742        self.check_price_precision(quote.ask_price.precision, "ask_price")
743            .unwrap();
744        self.check_size_precision(quote.bid_size.precision, "bid_size")
745            .unwrap();
746        self.check_size_precision(quote.ask_size.precision, "ask_size")
747            .unwrap();
748
749        if self.book_type == BookType::L1_MBP {
750            self.book.update_quote_tick(quote).unwrap();
751        }
752
753        self.iterate(quote.ts_init, AggressorSide::NoAggressor);
754    }
755
756    /// Processes a bar and simulates market dynamics by creating synthetic ticks.
757    ///
758    /// For L1 books with bar execution enabled, generates synthetic trade or quote
759    /// ticks from bar OHLC data to drive order matching.
760    ///
761    /// # Panics
762    ///
763    /// - If the bar type configuration is missing a time delta.
764    /// - If bar OHLC price precision does not match the instrument.
765    /// - If bar volume precision does not match the instrument.
766    pub fn process_bar(&mut self, bar: &Bar) {
767        log::debug!("Processing {bar}");
768
769        // Check if configured for bar execution can only process an L1 book with bars
770        if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
771            return;
772        }
773
774        let bar_type = bar.bar_type;
775        // Do not process internally aggregated bars
776        if bar_type.aggregation_source() == AggregationSource::Internal {
777            return;
778        }
779
780        self.check_price_precision(bar.open.precision, "bar open")
781            .unwrap();
782        self.check_price_precision(bar.high.precision, "bar high")
783            .unwrap();
784        self.check_price_precision(bar.low.precision, "bar low")
785            .unwrap();
786        self.check_price_precision(bar.close.precision, "bar close")
787            .unwrap();
788        self.check_size_precision(bar.volume.precision, "bar volume")
789            .unwrap();
790
791        let execution_bar_type =
792            if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
793                execution_bar_type.to_owned()
794            } else {
795                self.execution_bar_types
796                    .insert(bar.instrument_id(), bar_type);
797                self.execution_bar_deltas
798                    .insert(bar_type, bar_type.spec().timedelta());
799                bar_type
800            };
801
802        if execution_bar_type != bar_type {
803            let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
804            if bar_type_timedelta.is_none() {
805                bar_type_timedelta = Some(bar_type.spec().timedelta());
806                self.execution_bar_deltas
807                    .insert(bar_type, bar_type_timedelta.unwrap());
808            }
809            if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
810                >= &bar_type_timedelta.unwrap()
811            {
812                self.execution_bar_types
813                    .insert(bar_type.instrument_id(), bar_type);
814            } else {
815                return;
816            }
817        }
818
819        match bar_type.spec().price_type {
820            PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
821            PriceType::Bid => {
822                self.last_bar_bid = Some(bar.to_owned());
823                self.process_quote_ticks_from_bar(bar);
824            }
825            PriceType::Ask => {
826                self.last_bar_ask = Some(bar.to_owned());
827                self.process_quote_ticks_from_bar(bar);
828            }
829            PriceType::Mark => panic!("Not implemented"),
830        }
831    }
832
833    fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
834        // Split the bar into 4 trades, adding remainder to close trade
835        let quarter_raw = bar.volume.raw / 4;
836        let remainder_raw = bar.volume.raw % 4;
837        let size = Quantity::from_raw(quarter_raw, bar.volume.precision);
838        let close_size = Quantity::from_raw(quarter_raw + remainder_raw, bar.volume.precision);
839
840        let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
841        {
842            AggressorSide::Buyer
843        } else {
844            AggressorSide::Seller
845        };
846
847        // Create reusable trade tick
848        let mut trade_tick = TradeTick::new(
849            bar.instrument_id(),
850            bar.open,
851            size,
852            aggressor_side,
853            self.ids_generator.generate_trade_id(),
854            bar.ts_init,
855            bar.ts_init,
856        );
857
858        // Open: fill at market price (gap from previous bar)
859        if !self.core.is_last_initialized {
860            self.fill_at_market = true;
861            self.book.update_trade_tick(&trade_tick).unwrap();
862            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
863            self.core.set_last_raw(trade_tick.price);
864        } else if self.core.last.is_some_and(|last| bar.open != last) {
865            // Gap between previous close and this bar's open
866            self.fill_at_market = true;
867            self.book.update_trade_tick(&trade_tick).unwrap();
868            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
869            self.core.set_last_raw(trade_tick.price);
870        }
871
872        // Determine high/low processing order.
873        // Default: O→H→L→C. With adaptive ordering, swap if low is closer to open.
874        let high_first = !self.config.bar_adaptive_high_low_ordering
875            || (bar.high.raw - bar.open.raw).abs() < (bar.low.raw - bar.open.raw).abs();
876
877        if high_first {
878            self.process_bar_high(&mut trade_tick, bar);
879            self.process_bar_low(&mut trade_tick, bar);
880        } else {
881            self.process_bar_low(&mut trade_tick, bar);
882            self.process_bar_high(&mut trade_tick, bar);
883        }
884
885        // Close: fill at trigger price (market moving through prices)
886        if self.core.last.is_some_and(|last| bar.close != last) {
887            self.fill_at_market = false;
888            trade_tick.price = bar.close;
889            trade_tick.size = close_size;
890            if bar.close > self.core.last.unwrap() {
891                trade_tick.aggressor_side = AggressorSide::Buyer;
892            } else {
893                trade_tick.aggressor_side = AggressorSide::Seller;
894            }
895            trade_tick.trade_id = self.ids_generator.generate_trade_id();
896
897            self.book.update_trade_tick(&trade_tick).unwrap();
898            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
899
900            self.core.set_last_raw(trade_tick.price);
901        }
902
903        self.fill_at_market = true;
904    }
905
906    fn process_bar_high(&mut self, trade_tick: &mut TradeTick, bar: &Bar) {
907        if self.core.last.is_some_and(|last| bar.high > last) {
908            self.fill_at_market = false;
909            trade_tick.price = bar.high;
910            trade_tick.aggressor_side = AggressorSide::Buyer;
911            trade_tick.trade_id = self.ids_generator.generate_trade_id();
912
913            self.book.update_trade_tick(trade_tick).unwrap();
914            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
915
916            self.core.set_last_raw(trade_tick.price);
917        }
918    }
919
920    fn process_bar_low(&mut self, trade_tick: &mut TradeTick, bar: &Bar) {
921        if self.core.last.is_some_and(|last| bar.low < last) {
922            self.fill_at_market = false;
923            trade_tick.price = bar.low;
924            trade_tick.aggressor_side = AggressorSide::Seller;
925            trade_tick.trade_id = self.ids_generator.generate_trade_id();
926
927            self.book.update_trade_tick(trade_tick).unwrap();
928            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
929
930            self.core.set_last_raw(trade_tick.price);
931        }
932    }
933
934    fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
935        // Wait for next bar
936        if self.last_bar_bid.is_none()
937            || self.last_bar_ask.is_none()
938            || self.last_bar_bid.unwrap().ts_init != self.last_bar_ask.unwrap().ts_init
939        {
940            return;
941        }
942        let bid_bar = self.last_bar_bid.unwrap();
943        let ask_bar = self.last_bar_ask.unwrap();
944
945        // Split bar volume into 4, adding remainder to close quote
946        let bid_quarter = bid_bar.volume.raw / 4;
947        let bid_remainder = bid_bar.volume.raw % 4;
948        let ask_quarter = ask_bar.volume.raw / 4;
949        let ask_remainder = ask_bar.volume.raw % 4;
950
951        let bid_size = Quantity::from_raw(bid_quarter, bar.volume.precision);
952        let ask_size = Quantity::from_raw(ask_quarter, bar.volume.precision);
953        let bid_close_size = Quantity::from_raw(bid_quarter + bid_remainder, bar.volume.precision);
954        let ask_close_size = Quantity::from_raw(ask_quarter + ask_remainder, bar.volume.precision);
955
956        // Create reusable quote tick
957        let mut quote_tick = QuoteTick::new(
958            self.book.instrument_id,
959            bid_bar.open,
960            ask_bar.open,
961            bid_size,
962            ask_size,
963            bid_bar.ts_init,
964            bid_bar.ts_init,
965        );
966
967        // Open: fill at market price (gap from previous bar)
968        self.fill_at_market = true;
969        self.book.update_quote_tick(&quote_tick).unwrap();
970        self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
971
972        // High: fill at trigger price (market moving through prices)
973        self.fill_at_market = false;
974        quote_tick.bid_price = bid_bar.high;
975        quote_tick.ask_price = ask_bar.high;
976        self.book.update_quote_tick(&quote_tick).unwrap();
977        self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
978
979        // Low: fill at trigger price (market moving through prices)
980        self.fill_at_market = false;
981        quote_tick.bid_price = bid_bar.low;
982        quote_tick.ask_price = ask_bar.low;
983        self.book.update_quote_tick(&quote_tick).unwrap();
984        self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
985
986        // Close: fill at trigger price (market moving through prices)
987        self.fill_at_market = false;
988        quote_tick.bid_price = bid_bar.close;
989        quote_tick.ask_price = ask_bar.close;
990        quote_tick.bid_size = bid_close_size;
991        quote_tick.ask_size = ask_close_size;
992        self.book.update_quote_tick(&quote_tick).unwrap();
993        self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
994
995        self.last_bar_bid = None;
996        self.last_bar_ask = None;
997        self.fill_at_market = true;
998    }
999
1000    /// Processes a trade tick to update the market state.
1001    ///
1002    /// For L1 books, always updates the order book with the trade tick to maintain
1003    /// market state. When `trade_execution` is disabled, order matching and maintenance
1004    /// operations (GTD order expiry, trailing stop activation, instrument expiration)
1005    /// are skipped. These maintenance operations will run on the next quote tick or bar.
1006    ///
1007    /// # Panics
1008    ///
1009    /// - If updating the order book with the trade tick fails.
1010    /// - If trade price precision does not match the instrument.
1011    /// - If trade size precision does not match the instrument.
1012    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
1013        log::debug!("Processing {trade}");
1014
1015        self.check_price_precision(trade.price.precision, "trade price")
1016            .unwrap();
1017        self.check_size_precision(trade.size.precision, "trade size")
1018            .unwrap();
1019
1020        let price_raw = trade.price.raw;
1021
1022        if self.book_type == BookType::L1_MBP {
1023            self.book.update_trade_tick(trade).unwrap();
1024        }
1025
1026        self.core.set_last_raw(trade.price);
1027
1028        if !self.config.trade_execution {
1029            // Sync core to L1 book, skip order matching
1030            if self.book_type == BookType::L1_MBP {
1031                if let Some(bid) = self.book.best_bid_price() {
1032                    self.core.set_bid_raw(bid);
1033                }
1034                if let Some(ask) = self.book.best_ask_price() {
1035                    self.core.set_ask_raw(ask);
1036                }
1037            }
1038            return;
1039        }
1040
1041        let aggressor_side = trade.aggressor_side;
1042
1043        match aggressor_side {
1044            AggressorSide::Buyer => {
1045                if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
1046                    self.core.set_ask_raw(trade.price);
1047                }
1048                if self.core.bid.is_none()
1049                    || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1050                {
1051                    self.core.set_bid_raw(trade.price);
1052                }
1053            }
1054            AggressorSide::Seller => {
1055                if self.core.bid.is_none()
1056                    || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1057                {
1058                    self.core.set_bid_raw(trade.price);
1059                }
1060                if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
1061                    self.core.set_ask_raw(trade.price);
1062                }
1063            }
1064            AggressorSide::NoAggressor => {
1065                if self.core.bid.is_none()
1066                    || price_raw <= self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1067                {
1068                    self.core.set_bid_raw(trade.price);
1069                }
1070                if self.core.ask.is_none() || price_raw >= self.core.ask.map_or(0, |p| p.raw) {
1071                    self.core.set_ask_raw(trade.price);
1072                }
1073            }
1074        }
1075
1076        let original_bid = self.core.bid;
1077        let original_ask = self.core.ask;
1078
1079        match aggressor_side {
1080            AggressorSide::Seller => {
1081                if original_ask.is_some_and(|ask| price_raw < ask.raw) {
1082                    self.core.set_ask_raw(trade.price);
1083                }
1084            }
1085            AggressorSide::Buyer => {
1086                if original_bid.is_some_and(|bid| price_raw > bid.raw) {
1087                    self.core.set_bid_raw(trade.price);
1088                }
1089            }
1090            AggressorSide::NoAggressor => {
1091                // Force both sides to trade price (parity with Cython)
1092                self.core.set_bid_raw(trade.price);
1093                self.core.set_ask_raw(trade.price);
1094            }
1095        }
1096
1097        self.last_trade_size = Some(trade.size);
1098        self.trade_consumption = 0;
1099
1100        self.decrement_queue_on_trade(price_raw, trade.size.raw, aggressor_side);
1101        self.iterate(trade.ts_init, aggressor_side);
1102
1103        self.last_trade_size = None;
1104        self.trade_consumption = 0;
1105
1106        // Restore original bid/ask after temporary trade price override
1107        match aggressor_side {
1108            AggressorSide::Seller => {
1109                if let Some(ask) = original_ask
1110                    && price_raw < ask.raw
1111                {
1112                    self.core.ask = Some(ask);
1113                }
1114            }
1115            AggressorSide::Buyer => {
1116                if let Some(bid) = original_bid
1117                    && price_raw > bid.raw
1118                {
1119                    self.core.bid = Some(bid);
1120                }
1121            }
1122            AggressorSide::NoAggressor => {}
1123        }
1124    }
1125
1126    /// Processes a market status action to update the market state.
1127    pub fn process_status(&mut self, action: MarketStatusAction) {
1128        log::debug!("Processing {action}");
1129
1130        // Check if market is closed and market opens with trading or pre-open status
1131        if self.market_status == MarketStatus::Closed
1132            && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
1133        {
1134            self.market_status = MarketStatus::Open;
1135        }
1136        // Check if market is open and market pauses
1137        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
1138            self.market_status = MarketStatus::Paused;
1139        }
1140        // Check if market is open and market suspends
1141        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
1142            self.market_status = MarketStatus::Suspended;
1143        }
1144        // Check if market is open and we halt or close
1145        if self.market_status == MarketStatus::Open
1146            && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
1147        {
1148            self.market_status = MarketStatus::Closed;
1149        }
1150    }
1151
1152    /// Processes an instrument close event.
1153    ///
1154    /// For `ContractExpired` close types, stores the close and triggers expiration
1155    /// processing which cancels all open orders and closes all open positions.
1156    pub fn process_instrument_close(&mut self, close: InstrumentClose) {
1157        if close.instrument_id != self.instrument.id() {
1158            log::warn!(
1159                "Received instrument close for unknown instrument_id: {}",
1160                close.instrument_id
1161            );
1162            return;
1163        }
1164
1165        if close.close_type == InstrumentCloseType::ContractExpired {
1166            self.instrument_close = Some(close);
1167            self.iterate(close.ts_init, AggressorSide::NoAggressor);
1168        }
1169    }
1170
1171    fn check_instrument_expiration(&mut self) {
1172        if self.expiration_processed || self.instrument_close.is_none() {
1173            return;
1174        }
1175
1176        self.expiration_processed = true;
1177        let close = self.instrument_close.take().unwrap();
1178        log::info!("{} reached expiration", self.instrument.id());
1179
1180        let open_orders: Vec<OrderMatchInfo> = self.get_open_orders();
1181        for order_info in &open_orders {
1182            let order = {
1183                let cache = self.cache.borrow();
1184                cache.order(&order_info.client_order_id).cloned()
1185            };
1186            if let Some(order) = order {
1187                self.cancel_order(&order, None);
1188            }
1189        }
1190
1191        let instrument_id = self.instrument.id();
1192        let positions: Vec<(TraderId, StrategyId, PositionId, OrderSide, Quantity)> = {
1193            let cache = self.cache.borrow();
1194            cache
1195                .positions_open(None, Some(&instrument_id), None, None, None)
1196                .into_iter()
1197                .map(|pos| {
1198                    let closing_side = match pos.side {
1199                        PositionSide::Long => OrderSide::Sell,
1200                        PositionSide::Short => OrderSide::Buy,
1201                        _ => OrderSide::NoOrderSide,
1202                    };
1203                    (
1204                        pos.trader_id,
1205                        pos.strategy_id,
1206                        pos.id,
1207                        closing_side,
1208                        pos.quantity,
1209                    )
1210                })
1211                .collect()
1212        };
1213
1214        let ts_now = self.clock.borrow().timestamp_ns();
1215
1216        for (trader_id, strategy_id, position_id, closing_side, quantity) in positions {
1217            let client_order_id =
1218                ClientOrderId::from(format!("EXPIRATION-{}-{}", self.venue, UUID4::new()).as_str());
1219            let mut order = OrderAny::Market(MarketOrder::new(
1220                trader_id,
1221                strategy_id,
1222                instrument_id,
1223                client_order_id,
1224                closing_side,
1225                quantity,
1226                TimeInForce::Gtc,
1227                UUID4::new(),
1228                ts_now,
1229                true, // reduce_only
1230                false,
1231                None,
1232                None,
1233                None,
1234                None,
1235                None,
1236                None,
1237                None,
1238                Some(vec![Ustr::from(&format!(
1239                    "EXPIRATION_{}_CLOSE",
1240                    self.venue
1241                ))]),
1242            ));
1243
1244            if self
1245                .cache
1246                .borrow_mut()
1247                .add_order(order.clone(), Some(position_id), None, false)
1248                .is_err()
1249            {
1250                log::debug!("Expiration order already in cache: {client_order_id}");
1251            }
1252
1253            let venue_order_id = self.ids_generator.get_venue_order_id(&order).unwrap();
1254            self.generate_order_accepted(&mut order, venue_order_id);
1255            self.apply_fills(
1256                &mut order,
1257                vec![(close.close_price, quantity)],
1258                LiquiditySide::Taker,
1259                Some(position_id),
1260                None,
1261            );
1262        }
1263    }
1264
1265    // -- TRADING COMMANDS ------------------------------------------------------------------------
1266
1267    /// Processes a new order submission.
1268    ///
1269    /// Validates the order against instrument precision, expiration, and contingency
1270    /// rules before accepting or rejecting it.
1271    ///
1272    /// # Panics
1273    ///
1274    /// Panics if an OTO child order references a missing or non-OTO parent.
1275    #[allow(clippy::needless_return)]
1276    pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
1277        // Enter the scope where you will borrow a cache
1278        {
1279            let cache_borrow = self.cache.as_ref().borrow();
1280
1281            if self.core.order_exists(order.client_order_id()) {
1282                self.generate_order_rejected(order, "Order already exists".into());
1283                return;
1284            }
1285
1286            // Index identifiers
1287            self.account_ids.insert(order.trader_id(), account_id);
1288
1289            // Check for instrument expiration or activation
1290            if self.instrument.has_expiration() {
1291                if let Some(activation_ns) = self.instrument.activation_ns()
1292                    && self.clock.borrow().timestamp_ns() < activation_ns
1293                {
1294                    self.generate_order_rejected(
1295                        order,
1296                        format!(
1297                            "Contract {} is not yet active, activation {activation_ns}",
1298                            self.instrument.id(),
1299                        )
1300                        .into(),
1301                    );
1302                    return;
1303                }
1304                if let Some(expiration_ns) = self.instrument.expiration_ns()
1305                    && self.clock.borrow().timestamp_ns() >= expiration_ns
1306                {
1307                    self.generate_order_rejected(
1308                        order,
1309                        format!(
1310                            "Contract {} has expired, expiration {expiration_ns}",
1311                            self.instrument.id(),
1312                        )
1313                        .into(),
1314                    );
1315                    return;
1316                }
1317            }
1318
1319            // Contingent orders checks
1320            if self.config.support_contingent_orders {
1321                if let Some(parent_order_id) = order.parent_order_id() {
1322                    let parent_order = cache_borrow.order(&parent_order_id);
1323                    if parent_order.is_none()
1324                        || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
1325                    {
1326                        panic!("OTO parent not found");
1327                    }
1328                    if let Some(parent_order) = parent_order {
1329                        let parent_order_status = parent_order.status();
1330                        let order_is_open = order.is_open();
1331                        if parent_order.status() == OrderStatus::Rejected && order.is_open() {
1332                            self.generate_order_rejected(
1333                                order,
1334                                format!("Rejected OTO order from {parent_order_id}").into(),
1335                            );
1336                            return;
1337                        } else if parent_order.status() == OrderStatus::Accepted
1338                            && parent_order.status() == OrderStatus::Triggered
1339                        {
1340                            log::info!(
1341                                "Pending OTO order {} triggers from {parent_order_id}",
1342                                order.client_order_id(),
1343                            );
1344                            return;
1345                        }
1346                    }
1347                }
1348
1349                if let Some(linked_order_ids) = order.linked_order_ids() {
1350                    for client_order_id in linked_order_ids {
1351                        match cache_borrow.order(client_order_id) {
1352                            Some(contingent_order)
1353                                if (order.contingency_type().unwrap() == ContingencyType::Oco
1354                                    || order.contingency_type().unwrap()
1355                                        == ContingencyType::Ouo)
1356                                    && !order.is_closed()
1357                                    && contingent_order.is_closed() =>
1358                            {
1359                                self.generate_order_rejected(
1360                                    order,
1361                                    format!("Contingent order {client_order_id} already closed")
1362                                        .into(),
1363                                );
1364                                return;
1365                            }
1366                            None => panic!("Cannot find contingent order for {client_order_id}"),
1367                            _ => {}
1368                        }
1369                    }
1370                }
1371            }
1372
1373            // Check for valid order quantity precision
1374            if order.quantity().precision != self.instrument.size_precision() {
1375                self.generate_order_rejected(
1376                    order,
1377                    format!(
1378                        "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
1379                        order.client_order_id(),
1380                        order.quantity().precision,
1381                        self.instrument.id(),
1382                        self.instrument.size_precision()
1383                    )
1384                        .into(),
1385                );
1386                return;
1387            }
1388
1389            // Check for valid order price precision
1390            if let Some(price) = order.price()
1391                && price.precision != self.instrument.price_precision()
1392            {
1393                self.generate_order_rejected(
1394                        order,
1395                        format!(
1396                            "Invalid order price precision for order {}, was {} when {} price precision is {}",
1397                            order.client_order_id(),
1398                            price.precision,
1399                            self.instrument.id(),
1400                            self.instrument.price_precision()
1401                        )
1402                            .into(),
1403                    );
1404                return;
1405            }
1406
1407            // Check for valid order trigger price precision
1408            if let Some(trigger_price) = order.trigger_price()
1409                && trigger_price.precision != self.instrument.price_precision()
1410            {
1411                self.generate_order_rejected(
1412                        order,
1413                        format!(
1414                            "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
1415                            order.client_order_id(),
1416                            trigger_price.precision,
1417                            self.instrument.id(),
1418                            self.instrument.price_precision()
1419                        )
1420                            .into(),
1421                    );
1422                return;
1423            }
1424
1425            // Get position if exists
1426            let position: Option<&Position> = cache_borrow
1427                .position_for_order(&order.client_order_id())
1428                .or_else(|| {
1429                    if self.oms_type == OmsType::Netting {
1430                        let position_id = PositionId::new(
1431                            format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
1432                        );
1433                        cache_borrow.position(&position_id)
1434                    } else {
1435                        None
1436                    }
1437                });
1438
1439            // Check not shorting an equity without a MARGIN account
1440            if order.order_side() == OrderSide::Sell
1441                && self.account_type != AccountType::Margin
1442                && matches!(self.instrument, InstrumentAny::Equity(_))
1443                && (position.is_none()
1444                    || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
1445            {
1446                let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
1447                self.generate_order_rejected(
1448                    order,
1449                    format!(
1450                        "Short selling not permitted on a CASH account with position {position_string} and order {order}",
1451                    )
1452                        .into(),
1453                );
1454                return;
1455            }
1456
1457            // Check reduce-only instruction
1458            if self.config.use_reduce_only
1459                && order.is_reduce_only()
1460                && !order.is_closed()
1461                && position.is_none_or(|pos| {
1462                    pos.is_closed()
1463                        || (order.is_buy() && pos.is_long())
1464                        || (order.is_sell() && pos.is_short())
1465                })
1466            {
1467                self.generate_order_rejected(
1468                    order,
1469                    format!(
1470                        "Reduce-only order {} ({}-{}) would have increased position",
1471                        order.client_order_id(),
1472                        order.order_type().to_string().to_uppercase(),
1473                        order.order_side().to_string().to_uppercase()
1474                    )
1475                    .into(),
1476                );
1477                return;
1478            }
1479        }
1480
1481        match order.order_type() {
1482            OrderType::Market => self.process_market_order(order),
1483            OrderType::Limit => self.process_limit_order(order),
1484            OrderType::MarketToLimit => self.process_market_to_limit_order(order),
1485            OrderType::StopMarket => self.process_stop_market_order(order),
1486            OrderType::StopLimit => self.process_stop_limit_order(order),
1487            OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
1488            OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
1489            OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
1490            OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
1491        }
1492    }
1493
1494    /// Processes an order modify command to update quantity, price, or trigger price.
1495    pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
1496        if !self.core.order_exists(command.client_order_id) {
1497            self.generate_order_modify_rejected(
1498                command.trader_id,
1499                command.strategy_id,
1500                command.instrument_id,
1501                command.client_order_id,
1502                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
1503                command.venue_order_id,
1504                Some(account_id),
1505            );
1506            return;
1507        }
1508
1509        let mut order = match self.cache.borrow().order(&command.client_order_id).cloned() {
1510            Some(order) => order,
1511            None => {
1512                log::error!(
1513                    "Cannot modify order: order {} not found in cache",
1514                    command.client_order_id
1515                );
1516                return;
1517            }
1518        };
1519
1520        let update_success = self.update_order(
1521            &mut order,
1522            command.quantity,
1523            command.price,
1524            command.trigger_price,
1525            None,
1526        );
1527
1528        // Only persist changes if update succeeded and order is still open
1529        if update_success && order.is_open() {
1530            let _ = self.core.delete_order(command.client_order_id);
1531            let match_info = OrderMatchInfo::new(
1532                order.client_order_id(),
1533                order.order_side().as_specified(),
1534                order.order_type(),
1535                order.trigger_price(),
1536                order.price(),
1537                true,
1538            );
1539            self.core.add_order(match_info);
1540
1541            if self.config.queue_position
1542                && let Some(new_price) = order.price()
1543            {
1544                self.snapshot_queue_position(&order, new_price);
1545                self.queue_excess.remove(&order.client_order_id());
1546            }
1547        }
1548    }
1549
1550    /// Processes an order cancel command.
1551    pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
1552        if !self.core.order_exists(command.client_order_id) {
1553            self.generate_order_cancel_rejected(
1554                command.trader_id,
1555                command.strategy_id,
1556                account_id,
1557                command.instrument_id,
1558                command.client_order_id,
1559                command.venue_order_id,
1560                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
1561            );
1562            return;
1563        }
1564
1565        let order = match self.cache.borrow().order(&command.client_order_id).cloned() {
1566            Some(order) => order,
1567            None => {
1568                log::error!(
1569                    "Cannot cancel order: order {} not found in cache",
1570                    command.client_order_id
1571                );
1572                return;
1573            }
1574        };
1575
1576        if order.is_inflight() || order.is_open() {
1577            self.cancel_order(&order, None);
1578        }
1579    }
1580
1581    /// Processes a cancel all orders command for an instrument.
1582    pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
1583        let instrument_id = command.instrument_id;
1584        let open_orders = self
1585            .cache
1586            .borrow()
1587            .orders_open(None, Some(&instrument_id), None, None, None)
1588            .into_iter()
1589            .cloned()
1590            .collect::<Vec<OrderAny>>();
1591        for order in open_orders {
1592            if command.order_side != OrderSide::NoOrderSide
1593                && command.order_side != order.order_side()
1594            {
1595                continue;
1596            }
1597            if order.is_inflight() || order.is_open() {
1598                self.cancel_order(&order, None);
1599            }
1600        }
1601    }
1602
1603    /// Processes a batch cancel orders command.
1604    pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
1605        for order in &command.cancels {
1606            self.process_cancel(order, account_id);
1607        }
1608    }
1609
1610    fn process_market_order(&mut self, order: &mut OrderAny) {
1611        if order.time_in_force() == TimeInForce::AtTheOpen
1612            || order.time_in_force() == TimeInForce::AtTheClose
1613        {
1614            log::error!(
1615                "Market auction for the time in force {} is currently not supported",
1616                order.time_in_force()
1617            );
1618            return;
1619        }
1620
1621        // Check if market exists
1622        if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
1623            || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
1624        {
1625            self.generate_order_rejected(
1626                order,
1627                format!("No market for {}", order.instrument_id()).into(),
1628            );
1629            return;
1630        }
1631
1632        if self.config.use_market_order_acks {
1633            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1634            self.generate_order_accepted(order, venue_order_id);
1635        }
1636
1637        // Add order to cache for fill_market_order to fetch
1638        if let Err(e) = self
1639            .cache
1640            .borrow_mut()
1641            .add_order(order.clone(), None, None, false)
1642        {
1643            log::debug!("Order already in cache: {e}");
1644        }
1645
1646        self.fill_market_order(order.client_order_id());
1647    }
1648
1649    fn process_limit_order(&mut self, order: &mut OrderAny) {
1650        let limit_px = order.price().expect("Limit order must have a price");
1651        if order.is_post_only()
1652            && self
1653                .core
1654                .is_limit_matched(order.order_side_specified(), limit_px)
1655        {
1656            self.generate_order_rejected(
1657                order,
1658                format!(
1659                    "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
1660                    order.order_type(),
1661                    order.order_side(),
1662                    order.price().unwrap(),
1663                    self.core
1664                        .bid
1665                        .map_or_else(|| "None".to_string(), |p| p.to_string()),
1666                    self.core
1667                        .ask
1668                        .map_or_else(|| "None".to_string(), |p| p.to_string())
1669                )
1670                .into(),
1671            );
1672            return;
1673        }
1674
1675        // Order is valid and accepted
1676        self.accept_order(order);
1677
1678        // Check for immediate fill
1679        if self
1680            .core
1681            .is_limit_matched(order.order_side_specified(), limit_px)
1682        {
1683            // Filling as liquidity taker
1684            order.set_liquidity_side(LiquiditySide::Taker);
1685
1686            if self
1687                .cache
1688                .borrow_mut()
1689                .add_order(order.clone(), None, None, false)
1690                .is_err()
1691                && let Err(e) = self.cache.borrow_mut().update_order(order)
1692            {
1693                log::debug!("Failed to update order in cache: {e}");
1694            }
1695            self.fill_limit_order(order.client_order_id());
1696        } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
1697            self.cancel_order(order, None);
1698        } else {
1699            // Add passive order to cache for later modify/cancel operations
1700            order.set_liquidity_side(LiquiditySide::Maker);
1701
1702            if let Some(price) = order.price() {
1703                self.snapshot_queue_position(order, price);
1704            }
1705
1706            if let Err(e) = self
1707                .cache
1708                .borrow_mut()
1709                .add_order(order.clone(), None, None, false)
1710            {
1711                log::debug!("Order already in cache: {e}");
1712            }
1713        }
1714    }
1715
1716    fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
1717        // Check that market exists
1718        if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
1719            || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
1720        {
1721            self.generate_order_rejected(
1722                order,
1723                format!("No market for {}", order.instrument_id()).into(),
1724            );
1725            return;
1726        }
1727
1728        if self.config.use_market_order_acks {
1729            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1730            self.generate_order_accepted(order, venue_order_id);
1731        }
1732
1733        // Immediately fill marketable order
1734        if let Err(e) = self
1735            .cache
1736            .borrow_mut()
1737            .add_order(order.clone(), None, None, false)
1738        {
1739            log::debug!("Order already in cache: {e}");
1740        }
1741        let client_order_id = order.client_order_id();
1742        self.fill_market_order(client_order_id);
1743
1744        // Check for remaining quantity to rest as limit order
1745        let filled_qty = self
1746            .cached_filled_qty
1747            .get(&client_order_id)
1748            .copied()
1749            .unwrap_or_default();
1750        let leaves_qty = order.quantity().saturating_sub(filled_qty);
1751        if !leaves_qty.is_zero() {
1752            // Re-fetch from cache to get updated price from partial fill
1753            let updated_order = self.cache.borrow().order(&client_order_id).cloned();
1754            if let Some(mut updated_order) = updated_order {
1755                self.accept_order(&mut updated_order);
1756            }
1757        }
1758    }
1759
1760    fn process_stop_market_order(&mut self, order: &mut OrderAny) {
1761        let stop_px = order
1762            .trigger_price()
1763            .expect("Stop order must have a trigger price");
1764        if self
1765            .core
1766            .is_stop_matched(order.order_side_specified(), stop_px)
1767        {
1768            if self.config.reject_stop_orders {
1769                self.generate_order_rejected(
1770                    order,
1771                    format!(
1772                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1773                        order.order_type(),
1774                        order.order_side(),
1775                        order.trigger_price().unwrap(),
1776                        self.core
1777                            .bid
1778                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1779                        self.core
1780                            .ask
1781                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1782                    ).into(),
1783                );
1784                return;
1785            }
1786            if let Err(e) = self
1787                .cache
1788                .borrow_mut()
1789                .add_order(order.clone(), None, None, false)
1790            {
1791                log::debug!("Order already in cache: {e}");
1792            }
1793            self.fill_market_order(order.client_order_id());
1794            return;
1795        }
1796
1797        // order is not matched but is valid and we accept it
1798        self.accept_order(order);
1799
1800        // Add passive order to cache for later modify/cancel operations
1801        order.set_liquidity_side(LiquiditySide::Maker);
1802        if let Err(e) = self
1803            .cache
1804            .borrow_mut()
1805            .add_order(order.clone(), None, None, false)
1806        {
1807            log::debug!("Order already in cache: {e}");
1808        }
1809    }
1810
1811    fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
1812        let stop_px = order
1813            .trigger_price()
1814            .expect("Stop order must have a trigger price");
1815        if self
1816            .core
1817            .is_stop_matched(order.order_side_specified(), stop_px)
1818        {
1819            if self.config.reject_stop_orders {
1820                self.generate_order_rejected(
1821                    order,
1822                    format!(
1823                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1824                        order.order_type(),
1825                        order.order_side(),
1826                        order.trigger_price().unwrap(),
1827                        self.core
1828                            .bid
1829                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1830                        self.core
1831                            .ask
1832                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1833                    ).into(),
1834                );
1835                return;
1836            }
1837
1838            self.accept_order(order);
1839            self.generate_order_triggered(order);
1840
1841            // Check for immediate fill
1842            let limit_px = order.price().expect("Stop limit order must have a price");
1843            if self
1844                .core
1845                .is_limit_matched(order.order_side_specified(), limit_px)
1846            {
1847                order.set_liquidity_side(LiquiditySide::Taker);
1848                if let Err(e) = self
1849                    .cache
1850                    .borrow_mut()
1851                    .add_order(order.clone(), None, None, false)
1852                {
1853                    log::debug!("Order already in cache: {e}");
1854                }
1855                self.fill_limit_order(order.client_order_id());
1856            }
1857
1858            // Order was triggered (and possibly filled), don't accept again
1859            return;
1860        }
1861
1862        self.accept_order(order);
1863
1864        // Add passive order to cache for later modify/cancel operations
1865        order.set_liquidity_side(LiquiditySide::Maker);
1866        if let Err(e) = self
1867            .cache
1868            .borrow_mut()
1869            .add_order(order.clone(), None, None, false)
1870        {
1871            log::debug!("Order already in cache: {e}");
1872        }
1873    }
1874
1875    fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
1876        if self
1877            .core
1878            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1879        {
1880            if self.config.reject_stop_orders {
1881                self.generate_order_rejected(
1882                    order,
1883                    format!(
1884                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1885                        order.order_type(),
1886                        order.order_side(),
1887                        order.trigger_price().unwrap(),
1888                        self.core
1889                            .bid
1890                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1891                        self.core
1892                            .ask
1893                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1894                    ).into(),
1895                );
1896                return;
1897            }
1898            if let Err(e) = self
1899                .cache
1900                .borrow_mut()
1901                .add_order(order.clone(), None, None, false)
1902            {
1903                log::debug!("Order already in cache: {e}");
1904            }
1905            self.fill_market_order(order.client_order_id());
1906            return;
1907        }
1908
1909        // Order is valid and accepted
1910        self.accept_order(order);
1911
1912        // Add passive order to cache for later modify/cancel operations
1913        order.set_liquidity_side(LiquiditySide::Maker);
1914        if let Err(e) = self
1915            .cache
1916            .borrow_mut()
1917            .add_order(order.clone(), None, None, false)
1918        {
1919            log::debug!("Order already in cache: {e}");
1920        }
1921    }
1922
1923    fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
1924        if self
1925            .core
1926            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1927        {
1928            if self.config.reject_stop_orders {
1929                self.generate_order_rejected(
1930                    order,
1931                    format!(
1932                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1933                        order.order_type(),
1934                        order.order_side(),
1935                        order.trigger_price().unwrap(),
1936                        self.core
1937                            .bid
1938                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1939                        self.core
1940                            .ask
1941                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1942                    ).into(),
1943                );
1944                return;
1945            }
1946            self.accept_order(order);
1947            self.generate_order_triggered(order);
1948
1949            // Check if immediate marketable
1950            if self
1951                .core
1952                .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1953            {
1954                order.set_liquidity_side(LiquiditySide::Taker);
1955                if let Err(e) = self
1956                    .cache
1957                    .borrow_mut()
1958                    .add_order(order.clone(), None, None, false)
1959                {
1960                    log::debug!("Order already in cache: {e}");
1961                }
1962                self.fill_limit_order(order.client_order_id());
1963            }
1964            return;
1965        }
1966
1967        // Order is valid and accepted
1968        self.accept_order(order);
1969
1970        // Add passive order to cache for later modify/cancel operations
1971        order.set_liquidity_side(LiquiditySide::Maker);
1972        if let Err(e) = self
1973            .cache
1974            .borrow_mut()
1975            .add_order(order.clone(), None, None, false)
1976        {
1977            log::debug!("Order already in cache: {e}");
1978        }
1979    }
1980
1981    fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1982        if let Some(trigger_price) = order.trigger_price()
1983            && self
1984                .core
1985                .is_stop_matched(order.order_side_specified(), trigger_price)
1986        {
1987            self.generate_order_rejected(
1988                    order,
1989                    format!(
1990                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1991                        order.order_type(),
1992                        order.order_side(),
1993                        trigger_price,
1994                        self.core
1995                            .bid
1996                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1997                        self.core
1998                            .ask
1999                            .map_or_else(|| "None".to_string(), |p| p.to_string())
2000                    ).into(),
2001                );
2002            return;
2003        }
2004
2005        // Order is valid and accepted
2006        self.accept_order(order);
2007
2008        // Add passive order to cache for later modify/cancel operations
2009        order.set_liquidity_side(LiquiditySide::Maker);
2010        if let Err(e) = self
2011            .cache
2012            .borrow_mut()
2013            .add_order(order.clone(), None, None, false)
2014        {
2015            log::debug!("Order already in cache: {e}");
2016        }
2017    }
2018
2019    // -- ORDER PROCESSING ----------------------------------------------------
2020
2021    /// Iterate the matching engine by processing the bid and ask order sides
2022    /// and advancing time up to the given UNIX `timestamp_ns`.
2023    ///
2024    /// The `aggressor_side` parameter is used for trade execution processing.
2025    /// When not `NoAggressor`, the book-based bid/ask reset is skipped to preserve
2026    /// transient trade price overrides.
2027    pub fn iterate(&mut self, timestamp_ns: UnixNanos, aggressor_side: AggressorSide) {
2028        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
2029
2030        // Only reset bid/ask from book when not processing trade execution
2031        // (preserves transient trade price override for L2/L3 books)
2032        if aggressor_side == AggressorSide::NoAggressor {
2033            if let Some(bid) = self.book.best_bid_price() {
2034                self.core.set_bid_raw(bid);
2035            }
2036            if let Some(ask) = self.book.best_ask_price() {
2037                self.core.set_ask_raw(ask);
2038            }
2039        }
2040
2041        // Process expiration before matching to prevent fills on expired instruments
2042        self.check_instrument_expiration();
2043
2044        self.core.iterate();
2045
2046        let orders_bid = self.core.get_orders_bid().to_vec();
2047        let orders_ask = self.core.get_orders_ask().to_vec();
2048
2049        self.iterate_orders(timestamp_ns, &orders_bid);
2050        self.iterate_orders(timestamp_ns, &orders_ask);
2051
2052        // Restore core bid/ask to book values after order iteration
2053        // (during trade execution, transient override was used for matching)
2054        self.core.bid = self.book.best_bid_price();
2055        self.core.ask = self.book.best_ask_price();
2056    }
2057
2058    fn get_trailing_activation_price(
2059        &self,
2060        trigger_type: TriggerType,
2061        order_side: OrderSide,
2062        bid: Option<Price>,
2063        ask: Option<Price>,
2064        last: Option<Price>,
2065    ) -> Option<Price> {
2066        match trigger_type {
2067            TriggerType::LastPrice => last,
2068            TriggerType::LastOrBidAsk => last.or(match order_side {
2069                OrderSide::Buy => ask,
2070                OrderSide::Sell => bid,
2071                _ => None,
2072            }),
2073            // Default, BidAsk, DoubleBidAsk, DoubleLastPrice, IndexPrice, MarkPrice
2074            _ => match order_side {
2075                OrderSide::Buy => ask,
2076                OrderSide::Sell => bid,
2077                _ => None,
2078            },
2079        }
2080    }
2081
2082    fn maybe_activate_trailing_stop(
2083        &mut self,
2084        order: &mut OrderAny,
2085        bid: Option<Price>,
2086        ask: Option<Price>,
2087        last: Option<Price>,
2088    ) -> bool {
2089        match order {
2090            OrderAny::TrailingStopMarket(inner) => {
2091                if inner.is_activated {
2092                    return true;
2093                }
2094
2095                if inner.activation_price.is_none() {
2096                    let px = self.get_trailing_activation_price(
2097                        inner.trigger_type,
2098                        inner.order_side(),
2099                        bid,
2100                        ask,
2101                        last,
2102                    );
2103                    if let Some(p) = px {
2104                        inner.activation_price = Some(p);
2105                        inner.set_activated();
2106                        if let Err(e) = self.cache.borrow_mut().update_order(order) {
2107                            log::error!("Failed to update order: {e}");
2108                        }
2109                        return true;
2110                    }
2111                    return false;
2112                }
2113
2114                let activation_price = inner.activation_price.unwrap();
2115                let hit = match inner.order_side() {
2116                    OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
2117                    OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
2118                    _ => false,
2119                };
2120                if hit {
2121                    inner.set_activated();
2122                    if let Err(e) = self.cache.borrow_mut().update_order(order) {
2123                        log::error!("Failed to update order: {e}");
2124                    }
2125                }
2126                hit
2127            }
2128            OrderAny::TrailingStopLimit(inner) => {
2129                if inner.is_activated {
2130                    return true;
2131                }
2132
2133                if inner.activation_price.is_none() {
2134                    let px = self.get_trailing_activation_price(
2135                        inner.trigger_type,
2136                        inner.order_side(),
2137                        bid,
2138                        ask,
2139                        last,
2140                    );
2141                    if let Some(p) = px {
2142                        inner.activation_price = Some(p);
2143                        inner.set_activated();
2144                        if let Err(e) = self.cache.borrow_mut().update_order(order) {
2145                            log::error!("Failed to update order: {e}");
2146                        }
2147                        return true;
2148                    }
2149                    return false;
2150                }
2151
2152                let activation_price = inner.activation_price.unwrap();
2153                let hit = match inner.order_side() {
2154                    OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
2155                    OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
2156                    _ => false,
2157                };
2158                if hit {
2159                    inner.set_activated();
2160                    if let Err(e) = self.cache.borrow_mut().update_order(order) {
2161                        log::error!("Failed to update order: {e}");
2162                    }
2163                }
2164                hit
2165            }
2166            _ => true,
2167        }
2168    }
2169
2170    fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[OrderMatchInfo]) {
2171        for match_info in orders {
2172            let order = match self
2173                .cache
2174                .borrow()
2175                .order(&match_info.client_order_id)
2176                .cloned()
2177            {
2178                Some(order) => order,
2179                None => {
2180                    log::warn!(
2181                        "Order {} not found in cache during iteration, skipping",
2182                        match_info.client_order_id
2183                    );
2184                    continue;
2185                }
2186            };
2187
2188            if order.is_closed() {
2189                continue;
2190            }
2191
2192            if self.config.support_gtd_orders
2193                && order
2194                    .expire_time()
2195                    .is_some_and(|expire_timestamp_ns| timestamp_ns >= expire_timestamp_ns)
2196            {
2197                let _ = self.core.delete_order(match_info.client_order_id);
2198                self.cached_filled_qty.remove(&match_info.client_order_id);
2199                self.expire_order(&order);
2200                continue;
2201            }
2202
2203            if matches!(
2204                match_info.order_type,
2205                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
2206            ) {
2207                let mut any = order;
2208
2209                if !self.maybe_activate_trailing_stop(
2210                    &mut any,
2211                    self.core.bid,
2212                    self.core.ask,
2213                    self.core.last,
2214                ) {
2215                    continue;
2216                }
2217
2218                self.update_trailing_stop_order(&mut any);
2219
2220                // Persist the activated/updated trailing stop back to the core
2221                let _ = self.core.delete_order(match_info.client_order_id);
2222                let updated_match_info = OrderMatchInfo::new(
2223                    any.client_order_id(),
2224                    any.order_side().as_specified(),
2225                    any.order_type(),
2226                    any.trigger_price(),
2227                    any.price(),
2228                    match &any {
2229                        OrderAny::TrailingStopMarket(o) => o.is_activated,
2230                        OrderAny::TrailingStopLimit(o) => o.is_activated,
2231                        _ => true,
2232                    },
2233                );
2234                self.core.add_order(updated_match_info);
2235            }
2236
2237            // Move market back to targets
2238            if let Some(target_bid) = self.target_bid {
2239                self.core.bid = Some(target_bid);
2240                self.target_bid = None;
2241            }
2242            if let Some(target_bid) = self.target_bid.take() {
2243                self.core.bid = Some(target_bid);
2244                self.target_bid = None;
2245            }
2246            if let Some(target_ask) = self.target_ask.take() {
2247                self.core.ask = Some(target_ask);
2248                self.target_ask = None;
2249            }
2250            if let Some(target_last) = self.target_last.take() {
2251                self.core.last = Some(target_last);
2252                self.target_last = None;
2253            }
2254        }
2255
2256        // Reset any targets after iteration
2257        self.target_bid = None;
2258        self.target_ask = None;
2259        self.target_last = None;
2260    }
2261
2262    fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
2263        match order.price() {
2264            Some(order_price) => {
2265                // When liquidity consumption is enabled, get ALL crossed levels so that
2266                // consumed levels can be filtered out while still finding valid ones.
2267                // Otherwise simulate_fills only returns enough levels to satisfy leaves_qty,
2268                // which may all be consumed, missing other valid crossed levels.
2269                let mut fills = if self.config.liquidity_consumption {
2270                    let size_prec = self.instrument.size_precision();
2271                    self.book
2272                        .get_all_crossed_levels(order.order_side(), order_price, size_prec)
2273                } else {
2274                    let book_order =
2275                        BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
2276                    self.book.simulate_fills(&book_order)
2277                };
2278
2279                // Trade execution: use trade-driven fill when book doesn't reflect trade price
2280                if let Some(trade_size) = self.last_trade_size
2281                    && let Some(trade_price) = self.core.last
2282                {
2283                    let fills_at_trade_price = fills.iter().any(|(px, _)| *px == trade_price);
2284
2285                    if !fills_at_trade_price
2286                        && self
2287                            .core
2288                            .is_limit_matched(order.order_side_specified(), order_price)
2289                    {
2290                        // Fill model check for MAKER at limit is already handled in fill_limit_order,
2291                        // don't re-check here to avoid calling is_limit_filled() twice (p² probability).
2292                        let leaves_qty = order.leaves_qty();
2293                        let available_qty = if self.config.liquidity_consumption {
2294                            let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
2295                            Quantity::from_raw(remaining, trade_size.precision)
2296                        } else {
2297                            trade_size
2298                        };
2299
2300                        let fill_qty = min(leaves_qty, available_qty);
2301
2302                        if !fill_qty.is_zero() {
2303                            log::debug!(
2304                                "Trade execution fill: {} @ {} (trade_price={}, available: {}, book had {} fills)",
2305                                fill_qty,
2306                                order_price,
2307                                trade_price,
2308                                available_qty,
2309                                fills.len()
2310                            );
2311
2312                            if self.config.liquidity_consumption {
2313                                self.trade_consumption += fill_qty.raw;
2314                            }
2315
2316                            // Fill at the limit price (conservative) rather than the trade price.
2317                            // Trade execution fills already account for consumption via trade_consumption,
2318                            // return early to bypass apply_liquidity_consumption which would incorrectly
2319                            // discard these fills when the trade price isn't in the order book.
2320                            return vec![(order_price, fill_qty)];
2321                        }
2322                    }
2323                }
2324
2325                // Return immediately if no fills
2326                if fills.is_empty() {
2327                    return fills;
2328                }
2329
2330                // Save original book prices BEFORE any fill price modifications for consumption tracking,
2331                // since the TAKER and MAKER loops below may adjust fill prices. Consumption should be
2332                // tracked against the original book price levels where liquidity was sourced from.
2333                let book_prices: Vec<Price> = if self.config.liquidity_consumption {
2334                    fills.iter().map(|(px, _)| *px).collect()
2335                } else {
2336                    Vec::new()
2337                };
2338                let book_prices_ref: Option<&[Price]> = if book_prices.is_empty() {
2339                    None
2340                } else {
2341                    Some(&book_prices)
2342                };
2343
2344                // check if trigger price exists
2345                if let Some(triggered_price) = order.trigger_price() {
2346                    // Filling as TAKER from trigger
2347                    if order
2348                        .liquidity_side()
2349                        .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
2350                    {
2351                        if order.order_side() == OrderSide::Sell && order_price > triggered_price {
2352                            // manually change the fills index 0
2353                            let first_fill = fills.first().unwrap();
2354                            let triggered_qty = first_fill.1;
2355                            fills[0] = (triggered_price, triggered_qty);
2356                            self.target_bid = self.core.bid;
2357                            self.target_ask = self.core.ask;
2358                            self.target_last = self.core.last;
2359                            self.core.set_ask_raw(order_price);
2360                            self.core.set_last_raw(order_price);
2361                        } else if order.order_side() == OrderSide::Buy
2362                            && order_price < triggered_price
2363                        {
2364                            // manually change the fills index 0
2365                            let first_fill = fills.first().unwrap();
2366                            let triggered_qty = first_fill.1;
2367                            fills[0] = (triggered_price, triggered_qty);
2368                            self.target_bid = self.core.bid;
2369                            self.target_ask = self.core.ask;
2370                            self.target_last = self.core.last;
2371                            self.core.set_bid_raw(order_price);
2372                            self.core.set_last_raw(order_price);
2373                        }
2374                    }
2375                }
2376
2377                // Filling as MAKER from trigger
2378                if order
2379                    .liquidity_side()
2380                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
2381                {
2382                    match order.order_side().as_specified() {
2383                        OrderSideSpecified::Buy => {
2384                            let target_price = if order
2385                                .trigger_price()
2386                                .is_some_and(|trigger_price| order_price > trigger_price)
2387                            {
2388                                order.trigger_price().unwrap()
2389                            } else {
2390                                order_price
2391                            };
2392                            for fill in &mut fills {
2393                                let last_px = fill.0;
2394                                if last_px < order_price {
2395                                    // Marketable BUY would have filled at limit
2396                                    self.target_bid = self.core.bid;
2397                                    self.target_ask = self.core.ask;
2398                                    self.target_last = self.core.last;
2399                                    self.core.set_ask_raw(target_price);
2400                                    self.core.set_last_raw(target_price);
2401                                    fill.0 = target_price;
2402                                }
2403                            }
2404                        }
2405                        OrderSideSpecified::Sell => {
2406                            let target_price = if order
2407                                .trigger_price()
2408                                .is_some_and(|trigger_price| order_price < trigger_price)
2409                            {
2410                                order.trigger_price().unwrap()
2411                            } else {
2412                                order_price
2413                            };
2414                            for fill in &mut fills {
2415                                let last_px = fill.0;
2416                                if last_px > order_price {
2417                                    // Marketable SELL would have filled at limit
2418                                    self.target_bid = self.core.bid;
2419                                    self.target_ask = self.core.ask;
2420                                    self.target_last = self.core.last;
2421                                    self.core.set_bid_raw(target_price);
2422                                    self.core.set_last_raw(target_price);
2423                                    fill.0 = target_price;
2424                                }
2425                            }
2426                        }
2427                    }
2428                }
2429
2430                self.apply_liquidity_consumption(
2431                    fills,
2432                    order.order_side(),
2433                    order.leaves_qty(),
2434                    book_prices_ref,
2435                )
2436            }
2437            None => panic!("Limit order must have a price"),
2438        }
2439    }
2440
2441    fn determine_market_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
2442        let price = match order.order_side().as_specified() {
2443            OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
2444            OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
2445        };
2446
2447        // When liquidity consumption is enabled, get ALL crossed levels so that
2448        // consumed levels can be filtered out while still finding valid ones.
2449        let mut fills = if self.config.liquidity_consumption {
2450            let size_prec = self.instrument.size_precision();
2451            self.book
2452                .get_all_crossed_levels(order.order_side(), price, size_prec)
2453        } else {
2454            let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
2455            self.book.simulate_fills(&book_order)
2456        };
2457
2458        // For stop market and market-if-touched orders during bar H/L/C processing, fill at trigger price
2459        // (market moved through the trigger). For gaps/immediate triggers, fill at market.
2460        if !self.fill_at_market
2461            && self.book_type == BookType::L1_MBP
2462            && !fills.is_empty()
2463            && matches!(
2464                order.order_type(),
2465                OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
2466            )
2467            && let Some(trigger_price) = order.trigger_price()
2468        {
2469            fills[0] = (trigger_price, fills[0].1);
2470
2471            // Skip liquidity consumption for trigger price fills (gap price may not exist in book).
2472            let mut remaining_qty = order.leaves_qty().raw;
2473            let mut capped_fills = Vec::with_capacity(fills.len());
2474
2475            for (price, qty) in fills {
2476                if remaining_qty == 0 {
2477                    break;
2478                }
2479
2480                let capped_qty_raw = min(qty.raw, remaining_qty);
2481                if capped_qty_raw == 0 {
2482                    continue;
2483                }
2484
2485                remaining_qty -= capped_qty_raw;
2486                capped_fills.push((price, Quantity::from_raw(capped_qty_raw, qty.precision)));
2487            }
2488
2489            return capped_fills;
2490        }
2491
2492        fills
2493    }
2494
2495    fn determine_market_fill_model_price_and_volume(
2496        &mut self,
2497        order: &OrderAny,
2498    ) -> (Vec<(Price, Quantity)>, bool) {
2499        if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
2500            && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
2501                &self.instrument,
2502                order,
2503                best_bid,
2504                best_ask,
2505            )
2506        {
2507            let price = match order.order_side().as_specified() {
2508                OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
2509                OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
2510            };
2511            let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
2512            let fills = book.simulate_fills(&book_order);
2513            if !fills.is_empty() {
2514                return (fills, true);
2515            }
2516        }
2517        (self.determine_market_price_and_volume(order), false)
2518    }
2519
2520    fn determine_limit_fill_model_price_and_volume(
2521        &mut self,
2522        order: &OrderAny,
2523    ) -> Vec<(Price, Quantity)> {
2524        if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
2525            && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
2526                &self.instrument,
2527                order,
2528                best_bid,
2529                best_ask,
2530            )
2531            && let Some(limit_price) = order.price()
2532        {
2533            let book_order = BookOrder::new(order.order_side(), limit_price, order.quantity(), 0);
2534            let fills = book.simulate_fills(&book_order);
2535            if !fills.is_empty() {
2536                return fills;
2537            }
2538        }
2539        self.determine_limit_price_and_volume(order)
2540    }
2541
2542    /// Fills a market order against the current order book.
2543    ///
2544    /// The order is filled as a taker against available liquidity.
2545    /// Reduce-only orders are canceled if no position exists.
2546    pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
2547        let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
2548            Some(order) => order,
2549            None => {
2550                log::error!("Cannot fill market order: order {client_order_id} not found in cache");
2551                return;
2552            }
2553        };
2554
2555        if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
2556            && filled_qty >= &order.quantity()
2557        {
2558            log::info!(
2559                "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
2560                filled_qty,
2561                order.quantity(),
2562                order.filled_qty(),
2563                order.quantity()
2564            );
2565            return;
2566        }
2567
2568        let venue_position_id = self.ids_generator.get_position_id(&order, Some(true));
2569        let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
2570            let cache = self.cache.as_ref().borrow();
2571            cache.position(&venue_position_id).cloned()
2572        } else {
2573            None
2574        };
2575
2576        if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
2577            log::warn!(
2578                "Canceling REDUCE_ONLY {} as would increase position",
2579                order.order_type()
2580            );
2581            self.cancel_order(&order, None);
2582            return;
2583        }
2584
2585        order.set_liquidity_side(LiquiditySide::Taker);
2586        let (mut fills, from_synthetic) = self.determine_market_fill_model_price_and_volume(&order);
2587
2588        // Apply protection price filtering at fill time (trigger-time semantics for stops)
2589        if let Some(protection_points) = self.config.price_protection_points
2590            && matches!(
2591                order.order_type(),
2592                OrderType::Market | OrderType::StopMarket
2593            )
2594            && let Ok(protection_price) = protection_price_calculate(
2595                self.instrument.price_increment(),
2596                &order,
2597                protection_points,
2598                self.core.bid,
2599                self.core.ask,
2600            )
2601        {
2602            fills = self.filter_fills_by_protection(fills, &order, protection_price);
2603        }
2604
2605        // Skip consumption for synthetic fill-model books (prices may not exist
2606        // in the real book) and trigger price fills (gap price may not exist)
2607        let is_trigger_price_fill = !self.fill_at_market
2608            && self.book_type == BookType::L1_MBP
2609            && matches!(
2610                order.order_type(),
2611                OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
2612            )
2613            && order.trigger_price().is_some();
2614
2615        if !from_synthetic && !is_trigger_price_fill {
2616            fills = self.apply_liquidity_consumption(
2617                fills,
2618                order.order_side(),
2619                order.leaves_qty(),
2620                None,
2621            );
2622        }
2623
2624        self.apply_fills(&mut order, fills, LiquiditySide::Taker, None, position);
2625    }
2626
2627    fn filter_fills_by_protection(
2628        &self,
2629        fills: Vec<(Price, Quantity)>,
2630        order: &OrderAny,
2631        protection_price: Price,
2632    ) -> Vec<(Price, Quantity)> {
2633        let protection_raw = protection_price.raw;
2634        fills
2635            .into_iter()
2636            .filter(|(fill_price, _)| {
2637                match order.order_side() {
2638                    // BUY: only fill at prices <= protection_price
2639                    OrderSide::Buy => fill_price.raw <= protection_raw,
2640                    // SELL: only fill at prices >= protection_price
2641                    OrderSide::Sell => fill_price.raw >= protection_raw,
2642                    OrderSide::NoOrderSide => false,
2643                }
2644            })
2645            .collect()
2646    }
2647
2648    /// Attempts to fill a limit order against the current order book.
2649    ///
2650    /// Determines fill prices and quantities based on available liquidity,
2651    /// then applies the fills to the order.
2652    ///
2653    /// # Panics
2654    ///
2655    /// Panics if the order has no price (design error).
2656    pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
2657        let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
2658            Some(order) => order,
2659            None => {
2660                log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
2661                return;
2662            }
2663        };
2664
2665        match order.price() {
2666            Some(order_price) => {
2667                let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
2668                if let Some(&qty) = cached_filled_qty
2669                    && qty >= order.quantity()
2670                {
2671                    log::debug!(
2672                        "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
2673                        qty,
2674                        order.quantity(),
2675                        order.filled_qty(),
2676                        order.leaves_qty(),
2677                    );
2678                    return;
2679                }
2680
2681                // Check fill model for MAKER orders at the limit price
2682                if order
2683                    .liquidity_side()
2684                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
2685                {
2686                    // For trade execution: check if trade price equals order price
2687                    // For quote updates: check if bid/ask equals order price
2688                    let at_limit = if self.last_trade_size.is_some() && self.core.last.is_some() {
2689                        self.core.last.is_some_and(|last| last == order_price)
2690                    } else if order.order_side() == OrderSide::Buy {
2691                        self.core.bid.is_some_and(|bid| bid == order_price)
2692                    } else {
2693                        self.core.ask.is_some_and(|ask| ask == order_price)
2694                    };
2695
2696                    if at_limit && !self.fill_model.is_limit_filled() {
2697                        return; // Not filled (simulates queue position)
2698                    }
2699                }
2700
2701                let queue_allowed_raw = if self.config.queue_position {
2702                    match self.determine_trade_fill_qty(&order) {
2703                        None => return,
2704                        Some(0) => return,
2705                        Some(allowed) => Some(allowed),
2706                    }
2707                } else {
2708                    None
2709                };
2710
2711                let venue_position_id = self.ids_generator.get_position_id(&order, None);
2712                let position = if let Some(venue_position_id) = venue_position_id {
2713                    let cache = self.cache.as_ref().borrow();
2714                    cache.position(&venue_position_id).cloned()
2715                } else {
2716                    None
2717                };
2718
2719                if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
2720                    log::warn!(
2721                        "Canceling REDUCE_ONLY {} as would increase position",
2722                        order.order_type()
2723                    );
2724                    self.cancel_order(&order, None);
2725                    return;
2726                }
2727
2728                let tc_before = self.trade_consumption;
2729                let mut fills = self.determine_limit_fill_model_price_and_volume(&order);
2730
2731                if let Some(allowed_raw) = queue_allowed_raw {
2732                    let size_prec = self.instrument.size_precision();
2733                    let mut remaining = allowed_raw;
2734                    fills = fills
2735                        .into_iter()
2736                        .filter_map(|(price, qty)| {
2737                            if remaining == 0 {
2738                                return None;
2739                            }
2740                            let capped = qty.raw.min(remaining);
2741                            remaining -= capped;
2742                            Some((price, Quantity::from_raw(capped, size_prec)))
2743                        })
2744                        .collect();
2745
2746                    // Consume excess and reconcile trade budget after capping
2747                    let consumed: QuantityRaw = fills.iter().map(|(_, qty)| qty.raw).sum();
2748                    if let Some(excess) = self.queue_excess.get_mut(&order.client_order_id()) {
2749                        *excess = excess.saturating_sub(consumed);
2750                    }
2751                    self.trade_consumption = tc_before + consumed;
2752                }
2753
2754                // Skip apply_fills when consumed-liquidity adjustment produces no fills.
2755                // This occurs for partially filled orders when an unrelated delta arrives
2756                // and no new liquidity is available at the order's price level.
2757                if fills.is_empty() && self.config.liquidity_consumption {
2758                    log::debug!(
2759                        "Skipping fill for {}: no liquidity available after consumption",
2760                        order.client_order_id()
2761                    );
2762
2763                    if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
2764                        self.cancel_order(&order, None);
2765                    }
2766
2767                    return;
2768                }
2769
2770                let liquidity_side = order.liquidity_side().unwrap();
2771                self.apply_fills(
2772                    &mut order,
2773                    fills,
2774                    liquidity_side,
2775                    venue_position_id,
2776                    position,
2777                );
2778            }
2779            None => panic!("Limit order must have a price"),
2780        }
2781    }
2782
2783    fn apply_fills(
2784        &mut self,
2785        order: &mut OrderAny,
2786        fills: Vec<(Price, Quantity)>,
2787        liquidity_side: LiquiditySide,
2788        venue_position_id: Option<PositionId>,
2789        position: Option<Position>,
2790    ) {
2791        if order.time_in_force() == TimeInForce::Fok {
2792            let mut total_size = Quantity::zero(order.quantity().precision);
2793            for (fill_px, fill_qty) in &fills {
2794                total_size = total_size.add(*fill_qty);
2795            }
2796
2797            if order.leaves_qty() > total_size {
2798                self.cancel_order(order, None);
2799                return;
2800            }
2801        }
2802
2803        if fills.is_empty() {
2804            if order.status() == OrderStatus::Submitted {
2805                self.generate_order_rejected(
2806                    order,
2807                    format!("No market for {}", order.instrument_id()).into(),
2808                );
2809            } else {
2810                log::error!(
2811                    "Cannot fill order: no fills from book when fills were expected (check size in data)"
2812                );
2813                return;
2814            }
2815        }
2816
2817        // For netting mode, don't use venue position ID (use None instead)
2818        let venue_position_id = if self.oms_type == OmsType::Netting {
2819            None
2820        } else {
2821            venue_position_id
2822        };
2823
2824        let mut initial_market_to_limit_fill = false;
2825
2826        for &(mut fill_px, ref fill_qty) in &fills {
2827            assert!(
2828                (fill_px.precision == self.instrument.price_precision()),
2829                "Invalid price precision for fill price {} when instrument price precision is {}.\
2830                     Check that the data price precision matches the {} instrument",
2831                fill_px.precision,
2832                self.instrument.price_precision(),
2833                self.instrument.id()
2834            );
2835
2836            assert!(
2837                (fill_qty.precision == self.instrument.size_precision()),
2838                "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
2839                     Check that the data quantity precision matches the {} instrument",
2840                fill_qty.precision,
2841                self.instrument.size_precision(),
2842                self.instrument.id()
2843            );
2844
2845            if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
2846                && order.order_type() == OrderType::MarketToLimit
2847            {
2848                self.generate_order_updated(order, order.quantity(), Some(fill_px), None, None);
2849                initial_market_to_limit_fill = true;
2850            }
2851
2852            if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
2853                fill_px = match order.order_side().as_specified() {
2854                    OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
2855                    OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
2856                }
2857            }
2858
2859            // Check reduce only order
2860            // If the incoming simulated fill would exceed the position when reduce-only is honored,
2861            // clamp the effective fill size to the adjusted (remaining position) quantity.
2862            let mut effective_fill_qty = *fill_qty;
2863
2864            if self.config.use_reduce_only
2865                && order.is_reduce_only()
2866                && let Some(position) = &position
2867                && *fill_qty > position.quantity
2868            {
2869                if position.quantity == Quantity::zero(position.quantity.precision) {
2870                    // Done
2871                    return;
2872                }
2873
2874                // Adjusted target quantity equals the remaining position size
2875                let adjusted_fill_qty =
2876                    Quantity::from_raw(position.quantity.raw, fill_qty.precision);
2877
2878                // Determine the effective fill size for this iteration first
2879                effective_fill_qty = min(effective_fill_qty, adjusted_fill_qty);
2880
2881                // Only emit an update if the order quantity actually changes
2882                if order.quantity() != adjusted_fill_qty {
2883                    self.generate_order_updated(order, adjusted_fill_qty, None, None, None);
2884                }
2885            }
2886
2887            if fill_qty.is_zero() {
2888                if fills.len() == 1 && order.status() == OrderStatus::Submitted {
2889                    self.generate_order_rejected(
2890                        order,
2891                        format!("No market for {}", order.instrument_id()).into(),
2892                    );
2893                }
2894                return;
2895            }
2896
2897            self.fill_order(
2898                order,
2899                fill_px,
2900                effective_fill_qty,
2901                liquidity_side,
2902                venue_position_id,
2903                position.clone(),
2904            );
2905
2906            if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
2907                // filled initial level
2908                return;
2909            }
2910        }
2911
2912        if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
2913            // IOC order has filled all available size
2914            self.cancel_order(order, None);
2915            return;
2916        }
2917
2918        if order.is_open()
2919            && self.book_type == BookType::L1_MBP
2920            && matches!(
2921                order.order_type(),
2922                OrderType::Market
2923                    | OrderType::MarketIfTouched
2924                    | OrderType::StopMarket
2925                    | OrderType::TrailingStopMarket
2926            )
2927        {
2928            // Exhausted simulated book volume (continue aggressive filling into next level)
2929            // This is a very basic implementation of slipping by a single tick, in the future
2930            // we will implement more detailed fill modeling.
2931            todo!("Exhausted simulated book volume")
2932        }
2933    }
2934
2935    fn fill_order(
2936        &mut self,
2937        order: &mut OrderAny,
2938        last_px: Price,
2939        last_qty: Quantity,
2940        liquidity_side: LiquiditySide,
2941        venue_position_id: Option<PositionId>,
2942        position: Option<Position>,
2943    ) {
2944        self.check_size_precision(last_qty.precision, "fill quantity")
2945            .unwrap();
2946
2947        match self.cached_filled_qty.get(&order.client_order_id()) {
2948            Some(filled_qty) => {
2949                // Use saturating_sub to prevent panic if filled_qty > quantity
2950                let leaves_qty = order.quantity().saturating_sub(*filled_qty);
2951                let last_qty = min(last_qty, leaves_qty);
2952                let new_filled_qty = *filled_qty + last_qty;
2953                // update cached filled qty
2954                self.cached_filled_qty
2955                    .insert(order.client_order_id(), new_filled_qty);
2956            }
2957            None => {
2958                self.cached_filled_qty
2959                    .insert(order.client_order_id(), last_qty);
2960            }
2961        }
2962
2963        // calculate commission
2964        let commission = self
2965            .fee_model
2966            .get_commission(order, last_qty, last_px, &self.instrument)
2967            .unwrap();
2968
2969        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2970        self.generate_order_filled(
2971            order,
2972            venue_order_id,
2973            venue_position_id,
2974            last_qty,
2975            last_px,
2976            self.instrument.quote_currency(),
2977            commission,
2978            liquidity_side,
2979        );
2980
2981        if order.is_passive() && order.is_closed() {
2982            // Check if order exists in OrderMatching core, and delete it if it does
2983            if self.core.order_exists(order.client_order_id()) {
2984                let _ = self.core.delete_order(order.client_order_id());
2985            }
2986            self.cached_filled_qty.remove(&order.client_order_id());
2987        }
2988
2989        if !self.config.support_contingent_orders {
2990            return;
2991        }
2992
2993        if let Some(contingency_type) = order.contingency_type() {
2994            match contingency_type {
2995                ContingencyType::Oto => {
2996                    if let Some(linked_orders_ids) = order.linked_order_ids() {
2997                        for client_order_id in linked_orders_ids {
2998                            let mut child_order = match self.cache.borrow().order(client_order_id) {
2999                                Some(child_order) => child_order.clone(),
3000                                None => panic!("Order {client_order_id} not found in cache"),
3001                            };
3002
3003                            if child_order.is_closed() || child_order.is_active_local() {
3004                                continue;
3005                            }
3006
3007                            // Check if we need to index position id
3008                            if let (None, Some(position_id)) =
3009                                (child_order.position_id(), order.position_id())
3010                            {
3011                                self.cache
3012                                    .borrow_mut()
3013                                    .add_position_id(
3014                                        &position_id,
3015                                        &self.venue,
3016                                        client_order_id,
3017                                        &child_order.strategy_id(),
3018                                    )
3019                                    .unwrap();
3020                                log::debug!(
3021                                    "Added position id {position_id} to cache for order {client_order_id}"
3022                                );
3023                            }
3024
3025                            if (!child_order.is_open())
3026                                || (matches!(child_order.status(), OrderStatus::PendingUpdate)
3027                                    && child_order
3028                                        .previous_status()
3029                                        .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
3030                            {
3031                                let account_id = order.account_id().unwrap_or_else(|| {
3032                                    *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
3033                                        panic!(
3034                                            "Account ID not found for trader {}",
3035                                            order.trader_id()
3036                                        )
3037                                    })
3038                                });
3039                                self.process_order(&mut child_order, account_id);
3040                            }
3041                        }
3042                    } else {
3043                        log::error!(
3044                            "OTO order {} does not have linked orders",
3045                            order.client_order_id()
3046                        );
3047                    }
3048                }
3049                ContingencyType::Oco => {
3050                    if let Some(linked_orders_ids) = order.linked_order_ids() {
3051                        for client_order_id in linked_orders_ids {
3052                            let child_order = match self.cache.borrow().order(client_order_id) {
3053                                Some(child_order) => child_order.clone(),
3054                                None => panic!("Order {client_order_id} not found in cache"),
3055                            };
3056
3057                            if child_order.is_closed() || child_order.is_active_local() {
3058                                continue;
3059                            }
3060
3061                            self.cancel_order(&child_order, None);
3062                        }
3063                    } else {
3064                        log::error!(
3065                            "OCO order {} does not have linked orders",
3066                            order.client_order_id()
3067                        );
3068                    }
3069                }
3070                ContingencyType::Ouo => {
3071                    if let Some(linked_orders_ids) = order.linked_order_ids() {
3072                        for client_order_id in linked_orders_ids {
3073                            let mut child_order = match self.cache.borrow().order(client_order_id) {
3074                                Some(child_order) => child_order.clone(),
3075                                None => panic!("Order {client_order_id} not found in cache"),
3076                            };
3077
3078                            if child_order.is_active_local() {
3079                                continue;
3080                            }
3081
3082                            if order.is_closed() && child_order.is_open() {
3083                                self.cancel_order(&child_order, None);
3084                            } else if !order.leaves_qty().is_zero()
3085                                && order.leaves_qty() != child_order.leaves_qty()
3086                            {
3087                                let price = child_order.price();
3088                                let trigger_price = child_order.trigger_price();
3089                                self.update_order(
3090                                    &mut child_order,
3091                                    Some(order.leaves_qty()),
3092                                    price,
3093                                    trigger_price,
3094                                    Some(false),
3095                                );
3096                            }
3097                        }
3098                    } else {
3099                        log::error!(
3100                            "OUO order {} does not have linked orders",
3101                            order.client_order_id()
3102                        );
3103                    }
3104                }
3105                _ => {}
3106            }
3107        }
3108    }
3109
3110    fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
3111        if self
3112            .core
3113            .is_limit_matched(order.order_side_specified(), price)
3114        {
3115            if order.is_post_only() {
3116                self.generate_order_modify_rejected(
3117                    order.trader_id(),
3118                    order.strategy_id(),
3119                    order.instrument_id(),
3120                    order.client_order_id(),
3121                    Ustr::from(format!(
3122                        "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3123                        order.order_type(),
3124                        order.order_side(),
3125                        price,
3126                        self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3127                        self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3128                    ).as_str()),
3129                    order.venue_order_id(),
3130                    order.account_id(),
3131                );
3132                return;
3133            }
3134
3135            self.generate_order_updated(order, quantity, Some(price), None, None);
3136
3137            // Re-read from cache to get the order with events applied
3138            let client_order_id = order.client_order_id();
3139            if let Some(cached) = self.cache.borrow_mut().mut_order(&client_order_id) {
3140                cached.set_liquidity_side(LiquiditySide::Taker);
3141            }
3142            self.fill_limit_order(client_order_id);
3143            return;
3144        }
3145        self.generate_order_updated(order, quantity, Some(price), None, None);
3146    }
3147
3148    fn update_stop_market_order(
3149        &mut self,
3150        order: &mut OrderAny,
3151        quantity: Quantity,
3152        trigger_price: Price,
3153    ) {
3154        if self
3155            .core
3156            .is_stop_matched(order.order_side_specified(), trigger_price)
3157        {
3158            self.generate_order_modify_rejected(
3159                order.trader_id(),
3160                order.strategy_id(),
3161                order.instrument_id(),
3162                order.client_order_id(),
3163                Ustr::from(
3164                    format!(
3165                        "{} {} order new stop px of {} was in the market: bid={}, ask={}",
3166                        order.order_type(),
3167                        order.order_side(),
3168                        trigger_price,
3169                        self.core
3170                            .bid
3171                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
3172                        self.core
3173                            .ask
3174                            .map_or_else(|| "None".to_string(), |p| p.to_string())
3175                    )
3176                    .as_str(),
3177                ),
3178                order.venue_order_id(),
3179                order.account_id(),
3180            );
3181            return;
3182        }
3183
3184        self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
3185    }
3186
3187    fn update_stop_limit_order(
3188        &mut self,
3189        order: &mut OrderAny,
3190        quantity: Quantity,
3191        price: Price,
3192        trigger_price: Price,
3193    ) {
3194        if order.is_triggered().is_some_and(|t| t) {
3195            // Update limit price
3196            if self
3197                .core
3198                .is_limit_matched(order.order_side_specified(), price)
3199            {
3200                if order.is_post_only() {
3201                    self.generate_order_modify_rejected(
3202                        order.trader_id(),
3203                        order.strategy_id(),
3204                        order.instrument_id(),
3205                        order.client_order_id(),
3206                        Ustr::from(format!(
3207                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3208                            order.order_type(),
3209                            order.order_side(),
3210                            price,
3211                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3212                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3213                        ).as_str()),
3214                        order.venue_order_id(),
3215                        order.account_id(),
3216                    );
3217                    return;
3218                }
3219                self.generate_order_updated(order, quantity, Some(price), None, None);
3220                order.set_liquidity_side(LiquiditySide::Taker);
3221                if let Err(e) = self
3222                    .cache
3223                    .borrow_mut()
3224                    .add_order(order.clone(), None, None, false)
3225                {
3226                    log::debug!("Order already in cache: {e}");
3227                }
3228                self.fill_limit_order(order.client_order_id());
3229                return; // Filled
3230            }
3231        } else {
3232            // Update stop price
3233            if self
3234                .core
3235                .is_stop_matched(order.order_side_specified(), trigger_price)
3236            {
3237                self.generate_order_modify_rejected(
3238                    order.trader_id(),
3239                    order.strategy_id(),
3240                    order.instrument_id(),
3241                    order.client_order_id(),
3242                    Ustr::from(
3243                        format!(
3244                            "{} {} order new stop px of {} was in the market: bid={}, ask={}",
3245                            order.order_type(),
3246                            order.order_side(),
3247                            trigger_price,
3248                            self.core
3249                                .bid
3250                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
3251                            self.core
3252                                .ask
3253                                .map_or_else(|| "None".to_string(), |p| p.to_string())
3254                        )
3255                        .as_str(),
3256                    ),
3257                    order.venue_order_id(),
3258                    order.account_id(),
3259                );
3260                return;
3261            }
3262        }
3263
3264        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
3265    }
3266
3267    fn update_market_if_touched_order(
3268        &mut self,
3269        order: &mut OrderAny,
3270        quantity: Quantity,
3271        trigger_price: Price,
3272    ) {
3273        if self
3274            .core
3275            .is_touch_triggered(order.order_side_specified(), trigger_price)
3276        {
3277            self.generate_order_modify_rejected(
3278                order.trader_id(),
3279                order.strategy_id(),
3280                order.instrument_id(),
3281                order.client_order_id(),
3282                Ustr::from(
3283                    format!(
3284                        "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
3285                        order.order_type(),
3286                        order.order_side(),
3287                        trigger_price,
3288                        self.core
3289                            .bid
3290                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
3291                        self.core
3292                            .ask
3293                            .map_or_else(|| "None".to_string(), |p| p.to_string())
3294                    )
3295                    .as_str(),
3296                ),
3297                order.venue_order_id(),
3298                order.account_id(),
3299            );
3300            // Cannot update order
3301            return;
3302        }
3303
3304        self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
3305    }
3306
3307    fn update_limit_if_touched_order(
3308        &mut self,
3309        order: &mut OrderAny,
3310        quantity: Quantity,
3311        price: Price,
3312        trigger_price: Price,
3313    ) {
3314        if order.is_triggered().is_some_and(|t| t) {
3315            // Update limit price
3316            if self
3317                .core
3318                .is_limit_matched(order.order_side_specified(), price)
3319            {
3320                if order.is_post_only() {
3321                    self.generate_order_modify_rejected(
3322                        order.trader_id(),
3323                        order.strategy_id(),
3324                        order.instrument_id(),
3325                        order.client_order_id(),
3326                        Ustr::from(format!(
3327                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3328                            order.order_type(),
3329                            order.order_side(),
3330                            price,
3331                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3332                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3333                        ).as_str()),
3334                        order.venue_order_id(),
3335                        order.account_id(),
3336                    );
3337                    // Cannot update order
3338                    return;
3339                }
3340                self.generate_order_updated(order, quantity, Some(price), None, None);
3341                order.set_liquidity_side(LiquiditySide::Taker);
3342                self.fill_limit_order(order.client_order_id());
3343                return;
3344            }
3345        } else {
3346            // Update trigger price
3347            if self
3348                .core
3349                .is_touch_triggered(order.order_side_specified(), trigger_price)
3350            {
3351                self.generate_order_modify_rejected(
3352                    order.trader_id(),
3353                    order.strategy_id(),
3354                    order.instrument_id(),
3355                    order.client_order_id(),
3356                    Ustr::from(
3357                        format!(
3358                            "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
3359                            order.order_type(),
3360                            order.order_side(),
3361                            trigger_price,
3362                            self.core
3363                                .bid
3364                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
3365                            self.core
3366                                .ask
3367                                .map_or_else(|| "None".to_string(), |p| p.to_string())
3368                        )
3369                        .as_str(),
3370                    ),
3371                    order.venue_order_id(),
3372                    order.account_id(),
3373                );
3374                return;
3375            }
3376        }
3377
3378        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
3379    }
3380
3381    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
3382        let (new_trigger_price, new_price) = trailing_stop_calculate(
3383            self.instrument.price_increment(),
3384            order.trigger_price(),
3385            order.activation_price(),
3386            order,
3387            self.core.bid,
3388            self.core.ask,
3389            self.core.last,
3390        )
3391        .unwrap();
3392
3393        if new_trigger_price.is_none() && new_price.is_none() {
3394            return;
3395        }
3396
3397        self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price, None);
3398    }
3399
3400    // -- EVENT HANDLING -----------------------------------------------------
3401
3402    fn accept_order(&mut self, order: &mut OrderAny) {
3403        if order.is_closed() {
3404            // Temporary guard to prevent invalid processing
3405            return;
3406        }
3407        if order.status() != OrderStatus::Accepted {
3408            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3409            self.generate_order_accepted(order, venue_order_id);
3410
3411            if matches!(
3412                order.order_type(),
3413                OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
3414            ) && order.trigger_price().is_none()
3415            {
3416                self.update_trailing_stop_order(order);
3417            }
3418        }
3419
3420        let match_info = OrderMatchInfo::new(
3421            order.client_order_id(),
3422            order.order_side().as_specified(),
3423            order.order_type(),
3424            order.trigger_price(),
3425            order.price(),
3426            match order {
3427                OrderAny::TrailingStopMarket(o) => o.is_activated,
3428                OrderAny::TrailingStopLimit(o) => o.is_activated,
3429                _ => true,
3430            },
3431        );
3432        self.core.add_order(match_info);
3433    }
3434
3435    fn expire_order(&mut self, order: &OrderAny) {
3436        if self.config.support_contingent_orders
3437            && order
3438                .contingency_type()
3439                .is_some_and(|c| c != ContingencyType::NoContingency)
3440        {
3441            self.cancel_contingent_orders(order);
3442        }
3443
3444        self.generate_order_expired(order);
3445    }
3446
3447    fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
3448        let cancel_contingencies = cancel_contingencies.unwrap_or(true);
3449        if order.is_active_local() {
3450            log::error!(
3451                "Cannot cancel an order with {} from the matching engine",
3452                order.status()
3453            );
3454            return;
3455        }
3456
3457        // Check if order exists in OrderMatching core, and delete it if it does
3458        if self.core.order_exists(order.client_order_id()) {
3459            let _ = self.core.delete_order(order.client_order_id());
3460        }
3461        self.cached_filled_qty.remove(&order.client_order_id());
3462
3463        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3464        self.generate_order_canceled(order, venue_order_id);
3465
3466        if self.config.support_contingent_orders
3467            && order.contingency_type().is_some()
3468            && order.contingency_type().unwrap() != ContingencyType::NoContingency
3469            && cancel_contingencies
3470        {
3471            self.cancel_contingent_orders(order);
3472        }
3473    }
3474
3475    fn update_order(
3476        &mut self,
3477        order: &mut OrderAny,
3478        quantity: Option<Quantity>,
3479        price: Option<Price>,
3480        trigger_price: Option<Price>,
3481        update_contingencies: Option<bool>,
3482    ) -> bool {
3483        let update_contingencies = update_contingencies.unwrap_or(true);
3484        let quantity = quantity.unwrap_or(order.quantity());
3485
3486        let price_prec = self.instrument.price_precision();
3487        let size_prec = self.instrument.size_precision();
3488        let instrument_id = self.instrument.id();
3489        if quantity.precision != size_prec {
3490            self.generate_order_modify_rejected(
3491                order.trader_id(),
3492                order.strategy_id(),
3493                order.instrument_id(),
3494                order.client_order_id(),
3495                Ustr::from(&format!(
3496                    "Invalid update quantity precision {}, expected {size_prec} for {instrument_id}",
3497                    quantity.precision
3498                )),
3499                order.venue_order_id(),
3500                order.account_id(),
3501            );
3502            return false;
3503        }
3504        if let Some(px) = price
3505            && px.precision != price_prec
3506        {
3507            self.generate_order_modify_rejected(
3508                order.trader_id(),
3509                order.strategy_id(),
3510                order.instrument_id(),
3511                order.client_order_id(),
3512                Ustr::from(&format!(
3513                    "Invalid update price precision {}, expected {price_prec} for {instrument_id}",
3514                    px.precision
3515                )),
3516                order.venue_order_id(),
3517                order.account_id(),
3518            );
3519            return false;
3520        }
3521        if let Some(tp) = trigger_price
3522            && tp.precision != price_prec
3523        {
3524            self.generate_order_modify_rejected(
3525                order.trader_id(),
3526                order.strategy_id(),
3527                order.instrument_id(),
3528                order.client_order_id(),
3529                Ustr::from(&format!(
3530                    "Invalid update trigger_price precision {}, expected {price_prec} for {instrument_id}",
3531                    tp.precision
3532                )),
3533                order.venue_order_id(),
3534                order.account_id(),
3535            );
3536            return false;
3537        }
3538
3539        // Use cached_filled_qty since PassiveOrderAny in core is not updated with fills
3540        let filled_qty = self
3541            .cached_filled_qty
3542            .get(&order.client_order_id())
3543            .copied()
3544            .unwrap_or(order.filled_qty());
3545        if quantity < filled_qty {
3546            self.generate_order_modify_rejected(
3547                order.trader_id(),
3548                order.strategy_id(),
3549                order.instrument_id(),
3550                order.client_order_id(),
3551                Ustr::from(&format!(
3552                    "Cannot reduce order quantity {quantity} below filled quantity {filled_qty}",
3553                )),
3554                order.venue_order_id(),
3555                order.account_id(),
3556            );
3557            return false;
3558        }
3559
3560        match order {
3561            OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
3562                let price = price.unwrap_or(order.price().unwrap());
3563                self.update_limit_order(order, quantity, price);
3564            }
3565            OrderAny::StopMarket(_) => {
3566                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3567                self.update_stop_market_order(order, quantity, trigger_price);
3568            }
3569            OrderAny::StopLimit(_) => {
3570                let price = price.unwrap_or(order.price().unwrap());
3571                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3572                self.update_stop_limit_order(order, quantity, price, trigger_price);
3573            }
3574            OrderAny::MarketIfTouched(_) => {
3575                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3576                self.update_market_if_touched_order(order, quantity, trigger_price);
3577            }
3578            OrderAny::LimitIfTouched(_) => {
3579                let price = price.unwrap_or(order.price().unwrap());
3580                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3581                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
3582            }
3583            OrderAny::TrailingStopMarket(_) => {
3584                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
3585                self.update_market_if_touched_order(order, quantity, trigger_price);
3586            }
3587            OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
3588                let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
3589                let trigger_price =
3590                    trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
3591                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
3592            }
3593            _ => {
3594                panic!(
3595                    "Unsupported order type {} for update_order",
3596                    order.order_type()
3597                );
3598            }
3599        }
3600
3601        // If order now has zero leaves after update, cancel it
3602        let new_leaves_qty = quantity.saturating_sub(filled_qty);
3603        if new_leaves_qty.is_zero() {
3604            if self.config.support_contingent_orders
3605                && order
3606                    .contingency_type()
3607                    .is_some_and(|c| c != ContingencyType::NoContingency)
3608                && update_contingencies
3609            {
3610                self.update_contingent_order(order);
3611            }
3612            // Pass false since we already handled contingents above
3613            self.cancel_order(order, Some(false));
3614            return true;
3615        }
3616
3617        if self.config.support_contingent_orders
3618            && order
3619                .contingency_type()
3620                .is_some_and(|c| c != ContingencyType::NoContingency)
3621            && update_contingencies
3622        {
3623            self.update_contingent_order(order);
3624        }
3625
3626        true
3627    }
3628
3629    /// Triggers a stop order, converting it to an active market or limit order.
3630    pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
3631        let order = match self.cache.borrow().order(&client_order_id).cloned() {
3632            Some(order) => order,
3633            None => {
3634                log::error!(
3635                    "Cannot trigger stop order: order {client_order_id} not found in cache"
3636                );
3637                return;
3638            }
3639        };
3640
3641        match order.order_type() {
3642            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
3643                self.fill_limit_order(client_order_id);
3644            }
3645            OrderType::StopMarket | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
3646                self.fill_market_order(client_order_id);
3647            }
3648            _ => {
3649                log::error!(
3650                    "Cannot trigger stop order: invalid order type {}",
3651                    order.order_type()
3652                );
3653            }
3654        }
3655    }
3656
3657    fn update_contingent_order(&mut self, order: &OrderAny) {
3658        log::debug!("Updating OUO orders from {}", order.client_order_id());
3659        if let Some(linked_order_ids) = order.linked_order_ids() {
3660            let parent_filled_qty = self
3661                .cached_filled_qty
3662                .get(&order.client_order_id())
3663                .copied()
3664                .unwrap_or(order.filled_qty());
3665            let parent_leaves_qty = order.quantity().saturating_sub(parent_filled_qty);
3666
3667            for client_order_id in linked_order_ids {
3668                let mut child_order = match self.cache.borrow().order(client_order_id) {
3669                    Some(order) => order.clone(),
3670                    None => panic!("Order {client_order_id} not found in cache."),
3671                };
3672
3673                if child_order.is_active_local() {
3674                    continue;
3675                }
3676
3677                let child_filled_qty = self
3678                    .cached_filled_qty
3679                    .get(&child_order.client_order_id())
3680                    .copied()
3681                    .unwrap_or(child_order.filled_qty());
3682
3683                if parent_leaves_qty.is_zero() {
3684                    self.cancel_order(&child_order, Some(false));
3685                } else if child_filled_qty >= parent_leaves_qty {
3686                    // Child already filled beyond parent's remaining qty, cancel it
3687                    self.cancel_order(&child_order, Some(false));
3688                } else {
3689                    let child_leaves_qty = child_order.quantity().saturating_sub(child_filled_qty);
3690                    if child_leaves_qty != parent_leaves_qty {
3691                        let price = child_order.price();
3692                        let trigger_price = child_order.trigger_price();
3693                        self.update_order(
3694                            &mut child_order,
3695                            Some(parent_leaves_qty),
3696                            price,
3697                            trigger_price,
3698                            Some(false),
3699                        );
3700                    }
3701                }
3702            }
3703        }
3704    }
3705
3706    fn cancel_contingent_orders(&mut self, order: &OrderAny) {
3707        if let Some(linked_order_ids) = order.linked_order_ids() {
3708            for client_order_id in linked_order_ids {
3709                let contingent_order = match self.cache.borrow().order(client_order_id) {
3710                    Some(order) => order.clone(),
3711                    None => panic!("Cannot find contingent order for {client_order_id}"),
3712                };
3713                if contingent_order.is_active_local() {
3714                    // order is not on the exchange yet
3715                    continue;
3716                }
3717                if !contingent_order.is_closed() {
3718                    self.cancel_order(&contingent_order, Some(false));
3719                }
3720            }
3721        }
3722    }
3723
3724    // -- EVENT GENERATORS -----------------------------------------------------
3725
3726    fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
3727        let ts_now = self.clock.borrow().timestamp_ns();
3728        let account_id = order
3729            .account_id()
3730            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
3731
3732        // Check if rejection is due to post-only
3733        let due_post_only = reason.as_str().starts_with("POST_ONLY");
3734
3735        let event = OrderEventAny::Rejected(OrderRejected::new(
3736            order.trader_id(),
3737            order.strategy_id(),
3738            order.instrument_id(),
3739            order.client_order_id(),
3740            account_id,
3741            reason,
3742            UUID4::new(),
3743            ts_now,
3744            ts_now,
3745            false,
3746            due_post_only,
3747        ));
3748        let endpoint = MessagingSwitchboard::exec_engine_process();
3749        msgbus::send_order_event(endpoint, event);
3750    }
3751
3752    fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
3753        let ts_now = self.clock.borrow().timestamp_ns();
3754        let account_id = order
3755            .account_id()
3756            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
3757        let event = OrderEventAny::Accepted(OrderAccepted::new(
3758            order.trader_id(),
3759            order.strategy_id(),
3760            order.instrument_id(),
3761            order.client_order_id(),
3762            venue_order_id,
3763            account_id,
3764            UUID4::new(),
3765            ts_now,
3766            ts_now,
3767            false,
3768        ));
3769
3770        let endpoint = MessagingSwitchboard::exec_engine_process();
3771        msgbus::send_order_event(endpoint, event);
3772    }
3773
3774    #[allow(clippy::too_many_arguments)]
3775    fn generate_order_modify_rejected(
3776        &self,
3777        trader_id: TraderId,
3778        strategy_id: StrategyId,
3779        instrument_id: InstrumentId,
3780        client_order_id: ClientOrderId,
3781        reason: Ustr,
3782        venue_order_id: Option<VenueOrderId>,
3783        account_id: Option<AccountId>,
3784    ) {
3785        let ts_now = self.clock.borrow().timestamp_ns();
3786        let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
3787            trader_id,
3788            strategy_id,
3789            instrument_id,
3790            client_order_id,
3791            reason,
3792            UUID4::new(),
3793            ts_now,
3794            ts_now,
3795            false,
3796            venue_order_id,
3797            account_id,
3798        ));
3799        let endpoint = MessagingSwitchboard::exec_engine_process();
3800        msgbus::send_order_event(endpoint, event);
3801    }
3802
3803    #[allow(clippy::too_many_arguments)]
3804    fn generate_order_cancel_rejected(
3805        &self,
3806        trader_id: TraderId,
3807        strategy_id: StrategyId,
3808        account_id: AccountId,
3809        instrument_id: InstrumentId,
3810        client_order_id: ClientOrderId,
3811        venue_order_id: Option<VenueOrderId>,
3812        reason: Ustr,
3813    ) {
3814        let ts_now = self.clock.borrow().timestamp_ns();
3815        let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
3816            trader_id,
3817            strategy_id,
3818            instrument_id,
3819            client_order_id,
3820            reason,
3821            UUID4::new(),
3822            ts_now,
3823            ts_now,
3824            false,
3825            venue_order_id,
3826            Some(account_id),
3827        ));
3828        let endpoint = MessagingSwitchboard::exec_engine_process();
3829        msgbus::send_order_event(endpoint, event);
3830    }
3831
3832    fn generate_order_updated(
3833        &self,
3834        order: &mut OrderAny,
3835        quantity: Quantity,
3836        price: Option<Price>,
3837        trigger_price: Option<Price>,
3838        protection_price: Option<Price>,
3839    ) {
3840        let ts_now = self.clock.borrow().timestamp_ns();
3841        let event = OrderEventAny::Updated(OrderUpdated::new(
3842            order.trader_id(),
3843            order.strategy_id(),
3844            order.instrument_id(),
3845            order.client_order_id(),
3846            quantity,
3847            UUID4::new(),
3848            ts_now,
3849            ts_now,
3850            false,
3851            order.venue_order_id(),
3852            order.account_id(),
3853            price,
3854            trigger_price,
3855            protection_price,
3856        ));
3857
3858        let endpoint = MessagingSwitchboard::exec_engine_process();
3859        msgbus::send_order_event(endpoint, event);
3860    }
3861
3862    fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
3863        let ts_now = self.clock.borrow().timestamp_ns();
3864        let event = OrderEventAny::Canceled(OrderCanceled::new(
3865            order.trader_id(),
3866            order.strategy_id(),
3867            order.instrument_id(),
3868            order.client_order_id(),
3869            UUID4::new(),
3870            ts_now,
3871            ts_now,
3872            false,
3873            Some(venue_order_id),
3874            order.account_id(),
3875        ));
3876        let endpoint = MessagingSwitchboard::exec_engine_process();
3877        msgbus::send_order_event(endpoint, event);
3878    }
3879
3880    fn generate_order_triggered(&self, order: &OrderAny) {
3881        let ts_now = self.clock.borrow().timestamp_ns();
3882        let event = OrderEventAny::Triggered(OrderTriggered::new(
3883            order.trader_id(),
3884            order.strategy_id(),
3885            order.instrument_id(),
3886            order.client_order_id(),
3887            UUID4::new(),
3888            ts_now,
3889            ts_now,
3890            false,
3891            order.venue_order_id(),
3892            order.account_id(),
3893        ));
3894        let endpoint = MessagingSwitchboard::exec_engine_process();
3895        msgbus::send_order_event(endpoint, event);
3896    }
3897
3898    fn generate_order_expired(&self, order: &OrderAny) {
3899        let ts_now = self.clock.borrow().timestamp_ns();
3900        let event = OrderEventAny::Expired(OrderExpired::new(
3901            order.trader_id(),
3902            order.strategy_id(),
3903            order.instrument_id(),
3904            order.client_order_id(),
3905            UUID4::new(),
3906            ts_now,
3907            ts_now,
3908            false,
3909            order.venue_order_id(),
3910            order.account_id(),
3911        ));
3912        let endpoint = MessagingSwitchboard::exec_engine_process();
3913        msgbus::send_order_event(endpoint, event);
3914    }
3915
3916    #[allow(clippy::too_many_arguments)]
3917    fn generate_order_filled(
3918        &mut self,
3919        order: &mut OrderAny,
3920        venue_order_id: VenueOrderId,
3921        venue_position_id: Option<PositionId>,
3922        last_qty: Quantity,
3923        last_px: Price,
3924        quote_currency: Currency,
3925        commission: Money,
3926        liquidity_side: LiquiditySide,
3927    ) {
3928        debug_assert!(
3929            last_qty <= order.quantity(),
3930            "Fill quantity {last_qty} exceeds order quantity {order_qty} for {client_order_id}",
3931            order_qty = order.quantity(),
3932            client_order_id = order.client_order_id()
3933        );
3934
3935        let ts_now = self.clock.borrow().timestamp_ns();
3936        let account_id = order
3937            .account_id()
3938            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
3939        let event = OrderEventAny::Filled(OrderFilled::new(
3940            order.trader_id(),
3941            order.strategy_id(),
3942            order.instrument_id(),
3943            order.client_order_id(),
3944            venue_order_id,
3945            account_id,
3946            self.ids_generator.generate_trade_id(),
3947            order.order_side(),
3948            order.order_type(),
3949            last_qty,
3950            last_px,
3951            quote_currency,
3952            liquidity_side,
3953            UUID4::new(),
3954            ts_now,
3955            ts_now,
3956            false,
3957            venue_position_id,
3958            Some(commission),
3959        ));
3960
3961        let endpoint = MessagingSwitchboard::exec_engine_process();
3962        msgbus::send_order_event(endpoint, event);
3963    }
3964}