nautilus_execution/matching_engine/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    any::Any,
22    cell::RefCell,
23    cmp::min,
24    collections::HashMap,
25    ops::{Add, Sub},
26    rc::Rc,
27};
28
29use chrono::TimeDelta;
30use nautilus_common::{
31    cache::Cache,
32    clock::Clock,
33    msgbus::{self},
34};
35use nautilus_core::{UUID4, UnixNanos};
36use nautilus_model::{
37    data::{Bar, BarType, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, order::BookOrder},
38    enums::{
39        AccountType, AggregationSource, AggressorSide, BarAggregation, BookType, ContingencyType,
40        LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide, OrderSideSpecified,
41        OrderStatus, OrderType, PriceType, TimeInForce,
42    },
43    events::{
44        OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
45        OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
46    },
47    identifiers::{
48        AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
49        VenueOrderId,
50    },
51    instruments::{EXPIRING_INSTRUMENT_TYPES, Instrument, InstrumentAny},
52    orderbook::OrderBook,
53    orders::{Order, OrderAny, PassiveOrderAny, StopOrderAny},
54    position::Position,
55    types::{Currency, Money, Price, Quantity, fixed::FIXED_PRECISION},
56};
57use ustr::Ustr;
58
59use crate::{
60    matching_core::OrderMatchingCore,
61    matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
62    messages::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
63    models::{
64        fee::{FeeModel, FeeModelAny},
65        fill::FillModel,
66    },
67    trailing::trailing_stop_calculate,
68};
69
70/// An order matching engine for a single market.
71pub struct OrderMatchingEngine {
72    /// The venue for the matching engine.
73    pub venue: Venue,
74    /// The instrument for the matching engine.
75    pub instrument: InstrumentAny,
76    /// The instruments raw integer ID for the venue.
77    pub raw_id: u32,
78    /// The order book type for the matching engine.
79    pub book_type: BookType,
80    /// The order management system (OMS) type for the matching engine.
81    pub oms_type: OmsType,
82    /// The account type for the matching engine.
83    pub account_type: AccountType,
84    /// The market status for the matching engine.
85    pub market_status: MarketStatus,
86    /// The config for the matching engine.
87    pub config: OrderMatchingEngineConfig,
88    clock: Rc<RefCell<dyn Clock>>,
89    cache: Rc<RefCell<Cache>>,
90    book: OrderBook,
91    pub core: OrderMatchingCore,
92    fill_model: FillModel,
93    fee_model: FeeModelAny,
94    target_bid: Option<Price>,
95    target_ask: Option<Price>,
96    target_last: Option<Price>,
97    last_bar_bid: Option<Bar>,
98    last_bar_ask: Option<Bar>,
99    execution_bar_types: HashMap<InstrumentId, BarType>,
100    execution_bar_deltas: HashMap<BarType, TimeDelta>,
101    account_ids: HashMap<TraderId, AccountId>,
102    cached_filled_qty: HashMap<ClientOrderId, Quantity>,
103    ids_generator: IdsGenerator,
104}
105
106impl OrderMatchingEngine {
107    #[allow(clippy::too_many_arguments)]
108    pub fn new(
109        instrument: InstrumentAny,
110        raw_id: u32,
111        fill_model: FillModel,
112        fee_model: FeeModelAny,
113        book_type: BookType,
114        oms_type: OmsType,
115        account_type: AccountType,
116        clock: Rc<RefCell<dyn Clock>>,
117        cache: Rc<RefCell<Cache>>,
118        config: OrderMatchingEngineConfig,
119    ) -> Self {
120        let book = OrderBook::new(instrument.id(), book_type);
121        let core = OrderMatchingCore::new(
122            instrument.id(),
123            instrument.price_increment(),
124            None, // TBD (will be a function on the engine)
125            None, // TBD (will be a function on the engine)
126            None, // TBD (will be a function on the engine)
127        );
128        let ids_generator = IdsGenerator::new(
129            instrument.id().venue,
130            oms_type,
131            raw_id,
132            config.use_random_ids,
133            config.use_position_ids,
134            cache.clone(),
135        );
136
137        Self {
138            venue: instrument.id().venue,
139            instrument,
140            raw_id,
141            fill_model,
142            fee_model,
143            book_type,
144            oms_type,
145            account_type,
146            clock,
147            cache,
148            book,
149            core,
150            market_status: MarketStatus::Open,
151            config,
152            target_bid: None,
153            target_ask: None,
154            target_last: None,
155            last_bar_bid: None,
156            last_bar_ask: None,
157            execution_bar_types: HashMap::new(),
158            execution_bar_deltas: HashMap::new(),
159            account_ids: HashMap::new(),
160            cached_filled_qty: HashMap::new(),
161            ids_generator,
162        }
163    }
164
165    pub fn reset(&mut self) {
166        self.book.clear(0, UnixNanos::default());
167        self.execution_bar_types.clear();
168        self.execution_bar_deltas.clear();
169        self.account_ids.clear();
170        self.cached_filled_qty.clear();
171        self.core.reset();
172        self.target_bid = None;
173        self.target_ask = None;
174        self.target_last = None;
175        self.ids_generator.reset();
176
177        log::info!("Reset {}", self.instrument.id());
178    }
179
180    pub const fn set_fill_model(&mut self, fill_model: FillModel) {
181        self.fill_model = fill_model;
182    }
183
184    #[must_use]
185    pub fn best_bid_price(&self) -> Option<Price> {
186        self.book.best_bid_price()
187    }
188
189    #[must_use]
190    pub fn best_ask_price(&self) -> Option<Price> {
191        self.book.best_ask_price()
192    }
193
194    #[must_use]
195    pub const fn get_book(&self) -> &OrderBook {
196        &self.book
197    }
198
199    #[must_use]
200    pub fn get_open_bid_orders(&self) -> &[PassiveOrderAny] {
201        self.core.get_orders_bid()
202    }
203
204    #[must_use]
205    pub fn get_open_ask_orders(&self) -> &[PassiveOrderAny] {
206        self.core.get_orders_ask()
207    }
208
209    #[must_use]
210    pub fn get_open_orders(&self) -> Vec<PassiveOrderAny> {
211        // Get orders from both open bid orders and open ask orders
212        let mut orders = Vec::new();
213        orders.extend_from_slice(self.core.get_orders_bid());
214        orders.extend_from_slice(self.core.get_orders_ask());
215        orders
216    }
217
218    #[must_use]
219    pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
220        self.core.order_exists(client_order_id)
221    }
222
223    // -- DATA PROCESSING -------------------------------------------------------------------------
224
225    /// Process the venues market for the given order book delta.
226    pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) {
227        log::debug!("Processing {delta}");
228
229        if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
230            self.book.apply_delta(delta);
231        }
232
233        self.iterate(delta.ts_event);
234    }
235
236    pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) {
237        log::debug!("Processing {deltas}");
238
239        if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
240            self.book.apply_deltas(deltas);
241        }
242
243        self.iterate(deltas.ts_event);
244    }
245
246    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
247        log::debug!("Processing {quote}");
248
249        if self.book_type == BookType::L1_MBP {
250            self.book.update_quote_tick(quote).unwrap();
251        }
252
253        self.iterate(quote.ts_event);
254    }
255
256    pub fn process_bar(&mut self, bar: &Bar) {
257        log::debug!("Processing {bar}");
258
259        // Check if configured for bar execution can only process an L1 book with bars
260        if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
261            return;
262        }
263
264        let bar_type = bar.bar_type;
265        // Do not process internally aggregated bars
266        if bar_type.aggregation_source() == AggregationSource::Internal {
267            return;
268        }
269
270        // Do not process monthly bars (no `timedelta` available)
271        if bar_type.spec().aggregation == BarAggregation::Month {
272            return;
273        }
274
275        let execution_bar_type =
276            if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
277                execution_bar_type.to_owned()
278            } else {
279                self.execution_bar_types
280                    .insert(bar.instrument_id(), bar_type);
281                self.execution_bar_deltas
282                    .insert(bar_type, bar_type.spec().timedelta());
283                bar_type
284            };
285
286        if execution_bar_type != bar_type {
287            let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
288            if bar_type_timedelta.is_none() {
289                bar_type_timedelta = Some(bar_type.spec().timedelta());
290                self.execution_bar_deltas
291                    .insert(bar_type, bar_type_timedelta.unwrap());
292            }
293            if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
294                >= &bar_type_timedelta.unwrap()
295            {
296                self.execution_bar_types
297                    .insert(bar_type.instrument_id(), bar_type);
298            } else {
299                return;
300            }
301        }
302
303        match bar_type.spec().price_type {
304            PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
305            PriceType::Bid => {
306                self.last_bar_bid = Some(bar.to_owned());
307                self.process_quote_ticks_from_bar(bar);
308            }
309            PriceType::Ask => {
310                self.last_bar_ask = Some(bar.to_owned());
311                self.process_quote_ticks_from_bar(bar);
312            }
313            PriceType::Mark => panic!("Not implemented"),
314        }
315    }
316
317    fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
318        // Split the bar into 4 trades with quarter volume
319        let size = Quantity::new(bar.volume.as_f64() / 4.0, bar.volume.precision);
320        let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
321        {
322            AggressorSide::Buyer
323        } else {
324            AggressorSide::Seller
325        };
326
327        // Create reusable trade tick
328        let mut trade_tick = TradeTick::new(
329            bar.instrument_id(),
330            bar.open,
331            size,
332            aggressor_side,
333            self.ids_generator.generate_trade_id(),
334            bar.ts_event,
335            bar.ts_event,
336        );
337
338        // Open
339        // Check if not initialized, if it is, it will be updated by the close or last
340        if !self.core.is_last_initialized {
341            self.book.update_trade_tick(&trade_tick).unwrap();
342            self.iterate(trade_tick.ts_init);
343            self.core.set_last_raw(trade_tick.price);
344        }
345
346        // High
347        // Check if higher than last
348        if self.core.last.is_some_and(|last| bar.high > last) {
349            trade_tick.price = bar.high;
350            trade_tick.aggressor_side = AggressorSide::Buyer;
351            trade_tick.trade_id = self.ids_generator.generate_trade_id();
352
353            self.book.update_trade_tick(&trade_tick).unwrap();
354            self.iterate(trade_tick.ts_init);
355
356            self.core.set_last_raw(trade_tick.price);
357        }
358
359        // Low
360        // Check if lower than last
361        // Assumption: market traded down, aggressor hitting the bid(setting aggressor to seller)
362        if self.core.last.is_some_and(|last| bar.low < last) {
363            trade_tick.price = bar.low;
364            trade_tick.aggressor_side = AggressorSide::Seller;
365            trade_tick.trade_id = self.ids_generator.generate_trade_id();
366
367            self.book.update_trade_tick(&trade_tick).unwrap();
368            self.iterate(trade_tick.ts_init);
369
370            self.core.set_last_raw(trade_tick.price);
371        }
372
373        // Close
374        // Check if not the same as last
375        // Assumption: if close price is higher then last, aggressor is buyer
376        // Assumption: if close price is lower then last, aggressor is seller
377        if self.core.last.is_some_and(|last| bar.close != last) {
378            trade_tick.price = bar.close;
379            trade_tick.aggressor_side = if bar.close > self.core.last.unwrap() {
380                AggressorSide::Buyer
381            } else {
382                AggressorSide::Seller
383            };
384            trade_tick.trade_id = self.ids_generator.generate_trade_id();
385
386            self.book.update_trade_tick(&trade_tick).unwrap();
387            self.iterate(trade_tick.ts_init);
388
389            self.core.set_last_raw(trade_tick.price);
390        }
391    }
392
393    fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
394        // Wait for next bar
395        if self.last_bar_bid.is_none()
396            || self.last_bar_ask.is_none()
397            || self.last_bar_bid.unwrap().ts_event != self.last_bar_ask.unwrap().ts_event
398        {
399            return;
400        }
401        let bid_bar = self.last_bar_bid.unwrap();
402        let ask_bar = self.last_bar_ask.unwrap();
403        let bid_size = Quantity::new(bid_bar.volume.as_f64() / 4.0, bar.volume.precision);
404        let ask_size = Quantity::new(ask_bar.volume.as_f64() / 4.0, bar.volume.precision);
405
406        // Create reusable quote tick
407        let mut quote_tick = QuoteTick::new(
408            self.book.instrument_id,
409            bid_bar.open,
410            ask_bar.open,
411            bid_size,
412            ask_size,
413            bid_bar.ts_init,
414            bid_bar.ts_init,
415        );
416
417        // Open
418        self.book.update_quote_tick(&quote_tick).unwrap();
419        self.iterate(quote_tick.ts_init);
420
421        // High
422        quote_tick.bid_price = bid_bar.high;
423        quote_tick.ask_price = ask_bar.high;
424        self.book.update_quote_tick(&quote_tick).unwrap();
425        self.iterate(quote_tick.ts_init);
426
427        // Low
428        quote_tick.bid_price = bid_bar.low;
429        quote_tick.ask_price = ask_bar.low;
430        self.book.update_quote_tick(&quote_tick).unwrap();
431        self.iterate(quote_tick.ts_init);
432
433        // Close
434        quote_tick.bid_price = bid_bar.close;
435        quote_tick.ask_price = ask_bar.close;
436        self.book.update_quote_tick(&quote_tick).unwrap();
437        self.iterate(quote_tick.ts_init);
438
439        // Reset last bars
440        self.last_bar_bid = None;
441        self.last_bar_ask = None;
442    }
443
444    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
445        log::debug!("Processing {trade}");
446
447        if self.book_type == BookType::L1_MBP {
448            self.book.update_trade_tick(trade).unwrap();
449        }
450        self.core.set_last_raw(trade.price);
451
452        self.iterate(trade.ts_event);
453    }
454
455    pub fn process_status(&mut self, action: MarketStatusAction) {
456        log::debug!("Processing {action}");
457
458        // Check if market is closed and market opens with trading or pre-open status
459        if self.market_status == MarketStatus::Closed
460            && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
461        {
462            self.market_status = MarketStatus::Open;
463        }
464        // Check if market is open and market pauses
465        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
466            self.market_status = MarketStatus::Paused;
467        }
468        // Check if market is open and market suspends
469        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
470            self.market_status = MarketStatus::Suspended;
471        }
472        // Check if market is open and we halt or close
473        if self.market_status == MarketStatus::Open
474            && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
475        {
476            self.market_status = MarketStatus::Closed;
477        }
478    }
479
480    // -- TRADING COMMANDS ------------------------------------------------------------------------
481
482    #[allow(clippy::needless_return)]
483    pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
484        // Enter the scope where you will borrow a cache
485        {
486            let cache_borrow = self.cache.as_ref().borrow();
487
488            if self.core.order_exists(order.client_order_id()) {
489                self.generate_order_rejected(order, "Order already exists".into());
490                return;
491            }
492
493            // Index identifiers
494            self.account_ids.insert(order.trader_id(), account_id);
495
496            // Check for instrument expiration or activation
497            if EXPIRING_INSTRUMENT_TYPES.contains(&self.instrument.instrument_class()) {
498                if let Some(activation_ns) = self.instrument.activation_ns() {
499                    if self.clock.borrow().timestamp_ns() < activation_ns {
500                        self.generate_order_rejected(
501                            order,
502                            format!(
503                                "Contract {} is not yet active, activation {}",
504                                self.instrument.id(),
505                                self.instrument.activation_ns().unwrap()
506                            )
507                            .into(),
508                        );
509                        return;
510                    }
511                }
512                if let Some(expiration_ns) = self.instrument.expiration_ns() {
513                    if self.clock.borrow().timestamp_ns() >= expiration_ns {
514                        self.generate_order_rejected(
515                            order,
516                            format!(
517                                "Contract {} has expired, expiration {}",
518                                self.instrument.id(),
519                                self.instrument.expiration_ns().unwrap()
520                            )
521                            .into(),
522                        );
523                        return;
524                    }
525                }
526            }
527
528            // Contingent orders checks
529            if self.config.support_contingent_orders {
530                if let Some(parent_order_id) = order.parent_order_id() {
531                    let parent_order = cache_borrow.order(&parent_order_id);
532                    if parent_order.is_none()
533                        || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
534                    {
535                        panic!("OTO parent not found");
536                    }
537                    if let Some(parent_order) = parent_order {
538                        let parent_order_status = parent_order.status();
539                        let order_is_open = order.is_open();
540                        if parent_order.status() == OrderStatus::Rejected && order.is_open() {
541                            self.generate_order_rejected(
542                                order,
543                                format!("Rejected OTO order from {parent_order_id}").into(),
544                            );
545                            return;
546                        } else if parent_order.status() == OrderStatus::Accepted
547                            && parent_order.status() == OrderStatus::Triggered
548                        {
549                            log::info!(
550                                "Pending OTO order {} triggers from {parent_order_id}",
551                                order.client_order_id(),
552                            );
553                            return;
554                        }
555                    }
556                }
557
558                if let Some(linked_order_ids) = order.linked_order_ids() {
559                    for client_order_id in linked_order_ids {
560                        match cache_borrow.order(client_order_id) {
561                            Some(contingent_order)
562                                if (order.contingency_type().unwrap() == ContingencyType::Oco
563                                    || order.contingency_type().unwrap()
564                                        == ContingencyType::Ouo)
565                                    && !order.is_closed()
566                                    && contingent_order.is_closed() =>
567                            {
568                                self.generate_order_rejected(
569                                    order,
570                                    format!("Contingent order {client_order_id} already closed")
571                                        .into(),
572                                );
573                                return;
574                            }
575                            None => panic!("Cannot find contingent order for {client_order_id}"),
576                            _ => {}
577                        }
578                    }
579                }
580            }
581
582            // Check fo valid order quantity precision
583            if order.quantity().precision != self.instrument.size_precision() {
584                self.generate_order_rejected(
585                    order,
586                    format!(
587                        "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
588                        order.client_order_id(),
589                        order.quantity().precision,
590                        self.instrument.id(),
591                        self.instrument.size_precision()
592                    )
593                        .into(),
594                );
595                return;
596            }
597
598            // Check for valid order price precision
599            if let Some(price) = order.price() {
600                if price.precision != self.instrument.price_precision() {
601                    self.generate_order_rejected(
602                        order,
603                        format!(
604                            "Invalid order price precision for order {}, was {} when {} price precision is {}",
605                            order.client_order_id(),
606                            price.precision,
607                            self.instrument.id(),
608                            self.instrument.price_precision()
609                        )
610                            .into(),
611                    );
612                    return;
613                }
614            }
615
616            // Check for valid order trigger price precision
617            if let Some(trigger_price) = order.trigger_price() {
618                if trigger_price.precision != self.instrument.price_precision() {
619                    self.generate_order_rejected(
620                        order,
621                        format!(
622                            "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
623                            order.client_order_id(),
624                            trigger_price.precision,
625                            self.instrument.id(),
626                            self.instrument.price_precision()
627                        )
628                            .into(),
629                    );
630                    return;
631                }
632            }
633
634            // Get position if exists
635            let position: Option<&Position> = cache_borrow
636                .position_for_order(&order.client_order_id())
637                .or_else(|| {
638                    if self.oms_type == OmsType::Netting {
639                        let position_id = PositionId::new(
640                            format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
641                        );
642                        cache_borrow.position(&position_id)
643                    } else {
644                        None
645                    }
646                });
647
648            // Check not shorting an equity without a MARGIN account
649            if order.order_side() == OrderSide::Sell
650                && self.account_type != AccountType::Margin
651                && matches!(self.instrument, InstrumentAny::Equity(_))
652                && (position.is_none()
653                    || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
654            {
655                let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
656                self.generate_order_rejected(
657                    order,
658                    format!(
659                        "Short selling not permitted on a CASH account with position {position_string} and order {order}",
660                    )
661                        .into(),
662                );
663                return;
664            }
665
666            // Check reduce-only instruction
667            if self.config.use_reduce_only
668                && order.is_reduce_only()
669                && !order.is_closed()
670                && position.is_none_or(|pos| {
671                    pos.is_closed()
672                        || (order.is_buy() && pos.is_long())
673                        || (order.is_sell() && pos.is_short())
674                })
675            {
676                self.generate_order_rejected(
677                    order,
678                    format!(
679                        "Reduce-only order {} ({}-{}) would have increased position",
680                        order.client_order_id(),
681                        order.order_type().to_string().to_uppercase(),
682                        order.order_side().to_string().to_uppercase()
683                    )
684                    .into(),
685                );
686                return;
687            }
688        }
689
690        match order.order_type() {
691            OrderType::Market => self.process_market_order(order),
692            OrderType::Limit => self.process_limit_order(order),
693            OrderType::MarketToLimit => self.process_market_to_limit_order(order),
694            OrderType::StopMarket => self.process_stop_market_order(order),
695            OrderType::StopLimit => self.process_stop_limit_order(order),
696            OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
697            OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
698            OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
699            OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
700        }
701    }
702
703    pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
704        if let Some(order) = self.core.get_order(command.client_order_id) {
705            self.update_order(
706                &mut order.to_any(),
707                command.quantity,
708                command.price,
709                command.trigger_price,
710                None,
711            );
712        } else {
713            self.generate_order_modify_rejected(
714                command.trader_id,
715                command.strategy_id,
716                command.instrument_id,
717                command.client_order_id,
718                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
719                Some(command.venue_order_id),
720                Some(account_id),
721            );
722        }
723    }
724
725    pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
726        match self.core.get_order(command.client_order_id) {
727            Some(passive_order) => {
728                if passive_order.is_inflight() || passive_order.is_open() {
729                    self.cancel_order(&OrderAny::from(passive_order.to_owned()), None);
730                }
731            }
732            None => self.generate_order_cancel_rejected(
733                command.trader_id,
734                command.strategy_id,
735                account_id,
736                command.instrument_id,
737                command.client_order_id,
738                command.venue_order_id,
739                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
740            ),
741        }
742    }
743
744    pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
745        let open_orders = self
746            .cache
747            .borrow()
748            .orders_open(None, Some(&command.instrument_id), None, None)
749            .into_iter()
750            .cloned()
751            .collect::<Vec<OrderAny>>();
752        for order in open_orders {
753            if command.order_side != OrderSide::NoOrderSide
754                && command.order_side != order.order_side()
755            {
756                continue;
757            }
758            if order.is_inflight() || order.is_open() {
759                self.cancel_order(&order, None);
760            }
761        }
762    }
763
764    pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
765        for order in &command.cancels {
766            self.process_cancel(order, account_id);
767        }
768    }
769
770    fn process_market_order(&mut self, order: &mut OrderAny) {
771        if order.time_in_force() == TimeInForce::AtTheOpen
772            || order.time_in_force() == TimeInForce::AtTheClose
773        {
774            log::error!(
775                "Market auction for the time in force {} is currently not supported",
776                order.time_in_force()
777            );
778            return;
779        }
780
781        // Check if market exists
782        let order_side = order.order_side();
783        let is_ask_initialized = self.core.is_ask_initialized;
784        let is_bid_initialized = self.core.is_bid_initialized;
785        if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
786            || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
787        {
788            self.generate_order_rejected(
789                order,
790                format!("No market for {}", order.instrument_id()).into(),
791            );
792            return;
793        }
794
795        self.fill_market_order(order);
796    }
797
798    fn process_limit_order(&mut self, order: &mut OrderAny) {
799        let limit_px = order.price().expect("Limit order must have a price");
800        if order.is_post_only()
801            && self
802                .core
803                .is_limit_matched(order.order_side_specified(), limit_px)
804        {
805            self.generate_order_rejected(
806                order,
807                format!(
808                    "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
809                    order.order_type(),
810                    order.order_side(),
811                    order.price().unwrap(),
812                    self.core
813                        .bid
814                        .map_or_else(|| "None".to_string(), |p| p.to_string()),
815                    self.core
816                        .ask
817                        .map_or_else(|| "None".to_string(), |p| p.to_string())
818                )
819                .into(),
820            );
821            return;
822        }
823
824        // Order is valid and accepted
825        self.accept_order(order);
826
827        // Check for immediate fill
828        if self
829            .core
830            .is_limit_matched(order.order_side_specified(), limit_px)
831        {
832            // Filling as liquidity taker
833            if order.liquidity_side().is_some()
834                && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
835            {
836                order.set_liquidity_side(LiquiditySide::Taker);
837            }
838            self.fill_limit_order(order);
839        } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
840            self.cancel_order(order, None);
841        }
842    }
843
844    fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
845        // Check that market exists
846        if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
847            || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
848        {
849            self.generate_order_rejected(
850                order,
851                format!("No market for {}", order.instrument_id()).into(),
852            );
853            return;
854        }
855
856        // Immediately fill marketable order
857        self.fill_market_order(order);
858
859        if order.is_open() {
860            self.accept_order(order);
861        }
862    }
863
864    fn process_stop_market_order(&mut self, order: &mut OrderAny) {
865        let stop_px = order
866            .trigger_price()
867            .expect("Stop order must have a trigger price");
868        if self
869            .core
870            .is_stop_matched(order.order_side_specified(), stop_px)
871        {
872            if self.config.reject_stop_orders {
873                self.generate_order_rejected(
874                    order,
875                    format!(
876                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
877                        order.order_type(),
878                        order.order_side(),
879                        order.trigger_price().unwrap(),
880                        self.core
881                            .bid
882                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
883                        self.core
884                            .ask
885                            .map_or_else(|| "None".to_string(), |p| p.to_string())
886                    ).into(),
887                );
888                return;
889            }
890            self.fill_market_order(order);
891            return;
892        }
893
894        // order is not matched but is valid and we accept it
895        self.accept_order(order);
896    }
897
898    fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
899        let stop_px = order
900            .trigger_price()
901            .expect("Stop order must have a trigger price");
902        if self
903            .core
904            .is_stop_matched(order.order_side_specified(), stop_px)
905        {
906            if self.config.reject_stop_orders {
907                self.generate_order_rejected(
908                    order,
909                    format!(
910                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
911                        order.order_type(),
912                        order.order_side(),
913                        order.trigger_price().unwrap(),
914                        self.core
915                            .bid
916                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
917                        self.core
918                            .ask
919                            .map_or_else(|| "None".to_string(), |p| p.to_string())
920                    ).into(),
921                );
922                return;
923            }
924
925            self.accept_order(order);
926            self.generate_order_triggered(order);
927
928            // Check for immediate fill
929            let limit_px = order.price().expect("Stop limit order must have a price");
930            if self
931                .core
932                .is_limit_matched(order.order_side_specified(), limit_px)
933            {
934                order.set_liquidity_side(LiquiditySide::Taker);
935                self.fill_limit_order(order);
936            }
937        }
938
939        // order is not matched but is valid and we accept it
940        self.accept_order(order);
941    }
942
943    fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
944        if self
945            .core
946            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
947        {
948            if self.config.reject_stop_orders {
949                self.generate_order_rejected(
950                    order,
951                    format!(
952                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
953                        order.order_type(),
954                        order.order_side(),
955                        order.trigger_price().unwrap(),
956                        self.core
957                            .bid
958                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
959                        self.core
960                            .ask
961                            .map_or_else(|| "None".to_string(), |p| p.to_string())
962                    ).into(),
963                );
964                return;
965            }
966            self.fill_market_order(order);
967            return;
968        }
969
970        // Order is valid and accepted
971        self.accept_order(order);
972    }
973
974    fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
975        if self
976            .core
977            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
978        {
979            if self.config.reject_stop_orders {
980                self.generate_order_rejected(
981                    order,
982                    format!(
983                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
984                        order.order_type(),
985                        order.order_side(),
986                        order.trigger_price().unwrap(),
987                        self.core
988                            .bid
989                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
990                        self.core
991                            .ask
992                            .map_or_else(|| "None".to_string(), |p| p.to_string())
993                    ).into(),
994                );
995                return;
996            }
997            self.accept_order(order);
998            self.generate_order_triggered(order);
999
1000            // Check if immediate marketable
1001            if self
1002                .core
1003                .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1004            {
1005                order.set_liquidity_side(LiquiditySide::Taker);
1006                self.fill_limit_order(order);
1007            }
1008            return;
1009        }
1010
1011        // Order is valid and accepted
1012        self.accept_order(order);
1013    }
1014
1015    fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1016        if let Some(trigger_price) = order.trigger_price() {
1017            if self
1018                .core
1019                .is_stop_matched(order.order_side_specified(), trigger_price)
1020            {
1021                self.generate_order_rejected(
1022                    order,
1023                    format!(
1024                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1025                        order.order_type(),
1026                        order.order_side(),
1027                        trigger_price,
1028                        self.core
1029                            .bid
1030                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1031                        self.core
1032                            .ask
1033                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1034                    ).into(),
1035                );
1036                return;
1037            }
1038        }
1039
1040        // Order is valid and accepted
1041        self.accept_order(order);
1042    }
1043
1044    // -- ORDER PROCESSING ----------------------------------------------------
1045
1046    /// Iterate the matching engine by processing the bid and ask order sides
1047    /// and advancing time up to the given UNIX `timestamp_ns`.
1048    pub fn iterate(&mut self, timestamp_ns: UnixNanos) {
1049        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
1050
1051        // Check for updates in orderbook and set bid and ask in order matching core and iterate
1052        if self.book.has_bid() {
1053            self.core.set_bid_raw(self.book.best_bid_price().unwrap());
1054        }
1055        if self.book.has_ask() {
1056            self.core.set_ask_raw(self.book.best_ask_price().unwrap());
1057        }
1058        self.core.iterate();
1059
1060        self.core.bid = self.book.best_bid_price();
1061        self.core.ask = self.book.best_ask_price();
1062
1063        let orders_bid = self.core.get_orders_bid().to_vec();
1064        let orders_ask = self.core.get_orders_ask().to_vec();
1065
1066        self.iterate_orders(timestamp_ns, &orders_bid);
1067        self.iterate_orders(timestamp_ns, &orders_ask);
1068    }
1069
1070    fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[PassiveOrderAny]) {
1071        for order in orders {
1072            if order.is_closed() {
1073                continue;
1074            }
1075
1076            // Check expiration
1077            if self.config.support_gtd_orders {
1078                if let Some(expire_time) = order.expire_time() {
1079                    if timestamp_ns >= expire_time {
1080                        // SAFTEY: We know this order is in the core
1081                        self.core.delete_order(order).unwrap();
1082                        self.cached_filled_qty.remove(&order.client_order_id());
1083                        self.expire_order(order);
1084                    }
1085                }
1086            }
1087
1088            // Manage trailing stop
1089            if let PassiveOrderAny::Stop(o) = order {
1090                if let PassiveOrderAny::Stop(
1091                    StopOrderAny::TrailingStopMarket(_) | StopOrderAny::TrailingStopLimit(_),
1092                ) = order
1093                {
1094                    let mut order = OrderAny::from(o.to_owned());
1095                    self.update_trailing_stop_order(&mut order);
1096                }
1097            }
1098
1099            // Move market back to targets
1100            if let Some(target_bid) = self.target_bid {
1101                self.core.bid = Some(target_bid);
1102                self.target_bid = None;
1103            }
1104            if let Some(target_ask) = self.target_ask {
1105                self.core.ask = Some(target_ask);
1106                self.target_ask = None;
1107            }
1108            if let Some(target_last) = self.target_last {
1109                self.core.last = Some(target_last);
1110                self.target_last = None;
1111            }
1112        }
1113
1114        // Reset any targets after iteration
1115        self.target_bid = None;
1116        self.target_ask = None;
1117        self.target_last = None;
1118    }
1119
1120    fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1121        match order.price() {
1122            Some(order_price) => {
1123                // construct book order with price as passive with limit order price
1124                let book_order =
1125                    BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
1126
1127                let mut fills = self.book.simulate_fills(&book_order);
1128
1129                // return immediately if no fills
1130                if fills.is_empty() {
1131                    return fills;
1132                }
1133
1134                // check if trigger price exists
1135                if let Some(triggered_price) = order.trigger_price() {
1136                    // Filling as TAKER from trigger
1137                    if order
1138                        .liquidity_side()
1139                        .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
1140                    {
1141                        if order.order_side() == OrderSide::Sell && order_price > triggered_price {
1142                            // manually change the fills index 0
1143                            let first_fill = fills.first().unwrap();
1144                            let triggered_qty = first_fill.1;
1145                            fills[0] = (triggered_price, triggered_qty);
1146                            self.target_bid = self.core.bid;
1147                            self.target_ask = self.core.ask;
1148                            self.target_last = self.core.last;
1149                            self.core.set_ask_raw(order_price);
1150                            self.core.set_last_raw(order_price);
1151                        } else if order.order_side() == OrderSide::Buy
1152                            && order_price < triggered_price
1153                        {
1154                            // manually change the fills index 0
1155                            let first_fill = fills.first().unwrap();
1156                            let triggered_qty = first_fill.1;
1157                            fills[0] = (triggered_price, triggered_qty);
1158                            self.target_bid = self.core.bid;
1159                            self.target_ask = self.core.ask;
1160                            self.target_last = self.core.last;
1161                            self.core.set_bid_raw(order_price);
1162                            self.core.set_last_raw(order_price);
1163                        }
1164                    }
1165                }
1166
1167                // Filling as MAKER from trigger
1168                if order
1169                    .liquidity_side()
1170                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1171                {
1172                    match order.order_side().as_specified() {
1173                        OrderSideSpecified::Buy => {
1174                            let target_price = if order
1175                                .trigger_price()
1176                                .is_some_and(|trigger_price| order_price > trigger_price)
1177                            {
1178                                order.trigger_price().unwrap()
1179                            } else {
1180                                order_price
1181                            };
1182                            for fill in &fills {
1183                                let last_px = fill.0;
1184                                if last_px < order_price {
1185                                    // Marketable SELL would have filled at limit
1186                                    self.target_bid = self.core.bid;
1187                                    self.target_ask = self.core.ask;
1188                                    self.target_last = self.core.last;
1189                                    self.core.set_ask_raw(target_price);
1190                                    self.core.set_last_raw(target_price);
1191                                }
1192                            }
1193                        }
1194                        OrderSideSpecified::Sell => {
1195                            let target_price = if order
1196                                .trigger_price()
1197                                .is_some_and(|trigger_price| order_price < trigger_price)
1198                            {
1199                                order.trigger_price().unwrap()
1200                            } else {
1201                                order_price
1202                            };
1203                            for fill in &fills {
1204                                let last_px = fill.0;
1205                                if last_px > order_price {
1206                                    // Marketable BUY would have filled at limit
1207                                    self.target_bid = self.core.bid;
1208                                    self.target_ask = self.core.ask;
1209                                    self.target_last = self.core.last;
1210                                    self.core.set_bid_raw(target_price);
1211                                    self.core.set_last_raw(target_price);
1212                                }
1213                            }
1214                        }
1215                    }
1216                }
1217
1218                fills
1219            }
1220            None => panic!("Limit order must have a price"),
1221        }
1222    }
1223
1224    fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1225        // construct price
1226        let price = match order.order_side().as_specified() {
1227            OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
1228            OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
1229        };
1230
1231        // Construct BookOrder from order
1232        let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
1233        self.book.simulate_fills(&book_order)
1234    }
1235
1236    pub fn fill_market_order(&mut self, order: &mut OrderAny) {
1237        if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id()) {
1238            if filled_qty >= &order.quantity() {
1239                log::info!(
1240                    "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
1241                    filled_qty,
1242                    order.quantity(),
1243                    order.filled_qty(),
1244                    order.quantity()
1245                );
1246                return;
1247            }
1248        }
1249
1250        let venue_position_id = self.ids_generator.get_position_id(order, Some(true));
1251        let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
1252            let cache = self.cache.as_ref().borrow();
1253            cache.position(&venue_position_id).cloned()
1254        } else {
1255            None
1256        };
1257
1258        if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1259            log::warn!(
1260                "Canceling REDUCE_ONLY {} as would increase position",
1261                order.order_type()
1262            );
1263            self.cancel_order(order, None);
1264            return;
1265        }
1266        // set order side as taker
1267        order.set_liquidity_side(LiquiditySide::Taker);
1268        let fills = self.determine_market_price_and_volume(order);
1269        self.apply_fills(order, fills, LiquiditySide::Taker, None, position);
1270    }
1271
1272    pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
1273        match order.price() {
1274            Some(order_price) => {
1275                let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
1276                if cached_filled_qty.is_some() && *cached_filled_qty.unwrap() >= order.quantity() {
1277                    log::debug!(
1278                        "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
1279                        cached_filled_qty.unwrap(),
1280                        order.quantity(),
1281                        order.filled_qty(),
1282                        order.leaves_qty(),
1283                    );
1284                    return;
1285                }
1286
1287                if order
1288                    .liquidity_side()
1289                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1290                {
1291                    if order.order_side() == OrderSide::Buy
1292                        && self.core.bid.is_some_and(|bid| bid == order_price)
1293                        && !self.fill_model.is_limit_filled()
1294                    {
1295                        // no filled
1296                        return;
1297                    }
1298                    if order.order_side() == OrderSide::Sell
1299                        && self.core.ask.is_some_and(|ask| ask == order_price)
1300                        && !self.fill_model.is_limit_filled()
1301                    {
1302                        // no filled
1303                        return;
1304                    }
1305                }
1306
1307                let venue_position_id = self.ids_generator.get_position_id(order, None);
1308                let position = if let Some(venue_position_id) = venue_position_id {
1309                    let cache = self.cache.as_ref().borrow();
1310                    cache.position(&venue_position_id).cloned()
1311                } else {
1312                    None
1313                };
1314
1315                if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1316                    log::warn!(
1317                        "Canceling REDUCE_ONLY {} as would increase position",
1318                        order.order_type()
1319                    );
1320                    self.cancel_order(order, None);
1321                    return;
1322                }
1323
1324                let fills = self.determine_limit_price_and_volume(order);
1325
1326                self.apply_fills(
1327                    order,
1328                    fills,
1329                    order.liquidity_side().unwrap(),
1330                    venue_position_id,
1331                    position,
1332                );
1333            }
1334            None => panic!("Limit order must have a price"),
1335        }
1336    }
1337
1338    fn apply_fills(
1339        &mut self,
1340        order: &mut OrderAny,
1341        fills: Vec<(Price, Quantity)>,
1342        liquidity_side: LiquiditySide,
1343        venue_position_id: Option<PositionId>,
1344        position: Option<Position>,
1345    ) {
1346        if order.time_in_force() == TimeInForce::Fok {
1347            let mut total_size = Quantity::zero(order.quantity().precision);
1348            for (fill_px, fill_qty) in &fills {
1349                total_size = total_size.add(*fill_qty);
1350            }
1351
1352            if order.leaves_qty() > total_size {
1353                self.cancel_order(order, None);
1354                return;
1355            }
1356        }
1357
1358        if fills.is_empty() {
1359            if order.status() == OrderStatus::Submitted {
1360                self.generate_order_rejected(
1361                    order,
1362                    format!("No market for {}", order.instrument_id()).into(),
1363                );
1364            } else {
1365                log::error!(
1366                    "Cannot fill order: no fills from book when fills were expected (check size in data)"
1367                );
1368                return;
1369            }
1370        }
1371
1372        if self.oms_type == OmsType::Netting {
1373            let venue_position_id: Option<PositionId> = None;
1374        }
1375
1376        let mut initial_market_to_limit_fill = false;
1377        for &(mut fill_px, ref fill_qty) in &fills {
1378            // Validate price precision
1379            assert!(
1380                (fill_px.precision == self.instrument.price_precision()),
1381                "Invalid price precision for fill price {} when instrument price precision is {}.\
1382                     Check that the data price precision matches the {} instrument",
1383                fill_px.precision,
1384                self.instrument.price_precision(),
1385                self.instrument.id()
1386            );
1387
1388            // Validate quantity precision
1389            assert!(
1390                (fill_qty.precision == self.instrument.size_precision()),
1391                "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
1392                     Check that the data quantity precision matches the {} instrument",
1393                fill_qty.precision,
1394                self.instrument.size_precision(),
1395                self.instrument.id()
1396            );
1397
1398            if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
1399                && order.order_type() == OrderType::MarketToLimit
1400            {
1401                self.generate_order_updated(order, order.quantity(), Some(fill_px), None);
1402                initial_market_to_limit_fill = true;
1403            }
1404
1405            if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
1406                fill_px = match order.order_side().as_specified() {
1407                    OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
1408                    OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
1409                }
1410            }
1411
1412            // Check reduce only order
1413            if self.config.use_reduce_only && order.is_reduce_only() {
1414                if let Some(position) = &position {
1415                    if *fill_qty > position.quantity {
1416                        if position.quantity == Quantity::zero(position.quantity.precision) {
1417                            // Done
1418                            return;
1419                        }
1420
1421                        // Adjust fill to honor reduce only execution (fill remaining position size only)
1422                        let adjusted_fill_qty =
1423                            Quantity::from_raw(position.quantity.raw, fill_qty.precision);
1424
1425                        self.generate_order_updated(order, adjusted_fill_qty, None, None);
1426                    }
1427                }
1428            }
1429
1430            if fill_qty.is_zero() {
1431                if fills.len() == 1 && order.status() == OrderStatus::Submitted {
1432                    self.generate_order_rejected(
1433                        order,
1434                        format!("No market for {}", order.instrument_id()).into(),
1435                    );
1436                }
1437                return;
1438            }
1439
1440            self.fill_order(
1441                order,
1442                fill_px,
1443                *fill_qty,
1444                liquidity_side,
1445                venue_position_id,
1446                position.clone(),
1447            );
1448
1449            if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
1450                // filled initial level
1451                return;
1452            }
1453        }
1454
1455        if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
1456            // IOC order has filled all available size
1457            self.cancel_order(order, None);
1458            return;
1459        }
1460
1461        if order.is_open()
1462            && self.book_type == BookType::L1_MBP
1463            && matches!(
1464                order.order_type(),
1465                OrderType::Market | OrderType::MarketIfTouched | OrderType::StopMarket
1466            )
1467        {
1468            // Exhausted simulated book volume (continue aggressive filling into next level)
1469            // This is a very basic implementation of slipping by a single tick, in the future
1470            // we will implement more detailed fill modeling.
1471            todo!("Exhausted simulated book volume")
1472        }
1473    }
1474
1475    fn fill_order(
1476        &mut self,
1477        order: &mut OrderAny,
1478        last_px: Price,
1479        last_qty: Quantity,
1480        liquidity_side: LiquiditySide,
1481        venue_position_id: Option<PositionId>,
1482        position: Option<Position>,
1483    ) {
1484        match self.cached_filled_qty.get(&order.client_order_id()) {
1485            Some(filled_qty) => {
1486                let leaves_qty = order.quantity() - *filled_qty;
1487                let last_qty = min(last_qty, leaves_qty);
1488                let new_filled_qty = *filled_qty + last_qty;
1489                // update cached filled qty
1490                self.cached_filled_qty
1491                    .insert(order.client_order_id(), new_filled_qty);
1492            }
1493            None => {
1494                self.cached_filled_qty
1495                    .insert(order.client_order_id(), last_qty);
1496            }
1497        }
1498
1499        // calculate commission
1500        let commission = self
1501            .fee_model
1502            .get_commission(order, last_qty, last_px, &self.instrument)
1503            .unwrap();
1504
1505        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1506        self.generate_order_filled(
1507            order,
1508            venue_order_id,
1509            venue_position_id,
1510            last_qty,
1511            last_px,
1512            self.instrument.quote_currency(),
1513            commission,
1514            liquidity_side,
1515        );
1516
1517        if order.is_passive() && order.is_closed() {
1518            // Check if order exists in OrderMatching core, and delete it if it does
1519            if self.core.order_exists(order.client_order_id()) {
1520                let _ = self
1521                    .core
1522                    .delete_order(&PassiveOrderAny::from(order.clone()));
1523            }
1524            self.cached_filled_qty.remove(&order.client_order_id());
1525        }
1526
1527        if !self.config.support_contingent_orders {
1528            return;
1529        }
1530
1531        if let Some(contingency_type) = order.contingency_type() {
1532            match contingency_type {
1533                ContingencyType::Oto => {
1534                    if let Some(linked_orders_ids) = order.linked_order_ids() {
1535                        for client_order_id in linked_orders_ids {
1536                            let mut child_order = match self.cache.borrow().order(client_order_id) {
1537                                Some(child_order) => child_order.clone(),
1538                                None => panic!("Order {client_order_id} not found in cache"),
1539                            };
1540
1541                            if child_order.is_closed() || child_order.is_active_local() {
1542                                continue;
1543                            }
1544
1545                            // Check if we need to index position id
1546                            if let (None, Some(position_id)) =
1547                                (child_order.position_id(), order.position_id())
1548                            {
1549                                self.cache
1550                                    .borrow_mut()
1551                                    .add_position_id(
1552                                        &position_id,
1553                                        &self.venue,
1554                                        client_order_id,
1555                                        &child_order.strategy_id(),
1556                                    )
1557                                    .unwrap();
1558                                log::debug!(
1559                                    "Added position id {position_id} to cache for order {client_order_id}"
1560                                );
1561                            }
1562
1563                            if (!child_order.is_open())
1564                                || (matches!(child_order.status(), OrderStatus::PendingUpdate)
1565                                    && child_order
1566                                        .previous_status()
1567                                        .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
1568                            {
1569                                let account_id = order.account_id().unwrap_or_else(|| {
1570                                    *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
1571                                        panic!(
1572                                            "Account ID not found for trader {}",
1573                                            order.trader_id()
1574                                        )
1575                                    })
1576                                });
1577                                self.process_order(&mut child_order, account_id);
1578                            }
1579                        }
1580                    } else {
1581                        log::error!(
1582                            "OTO order {} does not have linked orders",
1583                            order.client_order_id()
1584                        );
1585                    }
1586                }
1587                ContingencyType::Oco => {
1588                    if let Some(linked_orders_ids) = order.linked_order_ids() {
1589                        for client_order_id in linked_orders_ids {
1590                            let child_order = match self.cache.borrow().order(client_order_id) {
1591                                Some(child_order) => child_order.clone(),
1592                                None => panic!("Order {client_order_id} not found in cache"),
1593                            };
1594
1595                            if child_order.is_closed() || child_order.is_active_local() {
1596                                continue;
1597                            }
1598
1599                            self.cancel_order(&child_order, None);
1600                        }
1601                    } else {
1602                        log::error!(
1603                            "OCO order {} does not have linked orders",
1604                            order.client_order_id()
1605                        );
1606                    }
1607                }
1608                ContingencyType::Ouo => {
1609                    if let Some(linked_orders_ids) = order.linked_order_ids() {
1610                        for client_order_id in linked_orders_ids {
1611                            let mut child_order = match self.cache.borrow().order(client_order_id) {
1612                                Some(child_order) => child_order.clone(),
1613                                None => panic!("Order {client_order_id} not found in cache"),
1614                            };
1615
1616                            if child_order.is_active_local() {
1617                                continue;
1618                            }
1619
1620                            if order.is_closed() && child_order.is_open() {
1621                                self.cancel_order(&child_order, None);
1622                            } else if !order.leaves_qty().is_zero()
1623                                && order.leaves_qty() != child_order.leaves_qty()
1624                            {
1625                                let price = child_order.price();
1626                                let trigger_price = child_order.trigger_price();
1627                                self.update_order(
1628                                    &mut child_order,
1629                                    Some(order.leaves_qty()),
1630                                    price,
1631                                    trigger_price,
1632                                    Some(false),
1633                                );
1634                            }
1635                        }
1636                    } else {
1637                        log::error!(
1638                            "OUO order {} does not have linked orders",
1639                            order.client_order_id()
1640                        );
1641                    }
1642                }
1643                _ => {}
1644            }
1645        }
1646    }
1647
1648    fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
1649        if self
1650            .core
1651            .is_limit_matched(order.order_side_specified(), price)
1652        {
1653            if order.is_post_only() {
1654                self.generate_order_modify_rejected(
1655                    order.trader_id(),
1656                    order.strategy_id(),
1657                    order.instrument_id(),
1658                    order.client_order_id(),
1659                    Ustr::from(format!(
1660                        "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1661                        order.order_type(),
1662                        order.order_side(),
1663                        price,
1664                        self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1665                        self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1666                    ).as_str()),
1667                    order.venue_order_id(),
1668                    order.account_id(),
1669                );
1670                return;
1671            }
1672
1673            self.generate_order_updated(order, quantity, Some(price), None);
1674            order.set_liquidity_side(LiquiditySide::Taker);
1675            self.fill_limit_order(order);
1676            return;
1677        }
1678        self.generate_order_updated(order, quantity, Some(price), None);
1679    }
1680
1681    fn update_stop_market_order(
1682        &mut self,
1683        order: &mut OrderAny,
1684        quantity: Quantity,
1685        trigger_price: Price,
1686    ) {
1687        if self
1688            .core
1689            .is_stop_matched(order.order_side_specified(), trigger_price)
1690        {
1691            self.generate_order_modify_rejected(
1692                order.trader_id(),
1693                order.strategy_id(),
1694                order.instrument_id(),
1695                order.client_order_id(),
1696                Ustr::from(
1697                    format!(
1698                        "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1699                        order.order_type(),
1700                        order.order_side(),
1701                        trigger_price,
1702                        self.core
1703                            .bid
1704                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1705                        self.core
1706                            .ask
1707                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1708                    )
1709                    .as_str(),
1710                ),
1711                order.venue_order_id(),
1712                order.account_id(),
1713            );
1714            return;
1715        }
1716
1717        self.generate_order_updated(order, quantity, None, Some(trigger_price));
1718    }
1719
1720    fn update_stop_limit_order(
1721        &mut self,
1722        order: &mut OrderAny,
1723        quantity: Quantity,
1724        price: Price,
1725        trigger_price: Price,
1726    ) {
1727        if order.is_triggered().is_some_and(|t| t) {
1728            // Update limit price
1729            if self
1730                .core
1731                .is_limit_matched(order.order_side_specified(), price)
1732            {
1733                if order.is_post_only() {
1734                    self.generate_order_modify_rejected(
1735                        order.trader_id(),
1736                        order.strategy_id(),
1737                        order.instrument_id(),
1738                        order.client_order_id(),
1739                        Ustr::from(format!(
1740                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1741                            order.order_type(),
1742                            order.order_side(),
1743                            price,
1744                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1745                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1746                        ).as_str()),
1747                        order.venue_order_id(),
1748                        order.account_id(),
1749                    );
1750                    return;
1751                }
1752                self.generate_order_updated(order, quantity, Some(price), None);
1753                order.set_liquidity_side(LiquiditySide::Taker);
1754                self.fill_limit_order(order);
1755                return; // Filled
1756            }
1757        } else {
1758            // Update stop price
1759            if self
1760                .core
1761                .is_stop_matched(order.order_side_specified(), trigger_price)
1762            {
1763                self.generate_order_modify_rejected(
1764                    order.trader_id(),
1765                    order.strategy_id(),
1766                    order.instrument_id(),
1767                    order.client_order_id(),
1768                    Ustr::from(
1769                        format!(
1770                            "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1771                            order.order_type(),
1772                            order.order_side(),
1773                            trigger_price,
1774                            self.core
1775                                .bid
1776                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
1777                            self.core
1778                                .ask
1779                                .map_or_else(|| "None".to_string(), |p| p.to_string())
1780                        )
1781                        .as_str(),
1782                    ),
1783                    order.venue_order_id(),
1784                    order.account_id(),
1785                );
1786                return;
1787            }
1788        }
1789
1790        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
1791    }
1792
1793    fn update_market_if_touched_order(
1794        &mut self,
1795        order: &mut OrderAny,
1796        quantity: Quantity,
1797        trigger_price: Price,
1798    ) {
1799        if self
1800            .core
1801            .is_touch_triggered(order.order_side_specified(), trigger_price)
1802        {
1803            self.generate_order_modify_rejected(
1804                order.trader_id(),
1805                order.strategy_id(),
1806                order.instrument_id(),
1807                order.client_order_id(),
1808                Ustr::from(
1809                    format!(
1810                        "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
1811                        order.order_type(),
1812                        order.order_side(),
1813                        trigger_price,
1814                        self.core
1815                            .bid
1816                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1817                        self.core
1818                            .ask
1819                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1820                    )
1821                    .as_str(),
1822                ),
1823                order.venue_order_id(),
1824                order.account_id(),
1825            );
1826            // Cannot update order
1827            return;
1828        }
1829
1830        self.generate_order_updated(order, quantity, None, Some(trigger_price));
1831    }
1832
1833    fn update_limit_if_touched_order(
1834        &mut self,
1835        order: &mut OrderAny,
1836        quantity: Quantity,
1837        price: Price,
1838        trigger_price: Price,
1839    ) {
1840        if order.is_triggered().is_some_and(|t| t) {
1841            // Update limit price
1842            if self
1843                .core
1844                .is_limit_matched(order.order_side_specified(), price)
1845            {
1846                if order.is_post_only() {
1847                    self.generate_order_modify_rejected(
1848                        order.trader_id(),
1849                        order.strategy_id(),
1850                        order.instrument_id(),
1851                        order.client_order_id(),
1852                        Ustr::from(format!(
1853                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1854                            order.order_type(),
1855                            order.order_side(),
1856                            price,
1857                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1858                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1859                        ).as_str()),
1860                        order.venue_order_id(),
1861                        order.account_id(),
1862                    );
1863                    // Cannot update order
1864                    return;
1865                }
1866                self.generate_order_updated(order, quantity, Some(price), None);
1867                order.set_liquidity_side(LiquiditySide::Taker);
1868                self.fill_limit_order(order);
1869                return;
1870            }
1871        } else {
1872            // Update trigger price
1873            if self
1874                .core
1875                .is_touch_triggered(order.order_side_specified(), trigger_price)
1876            {
1877                self.generate_order_modify_rejected(
1878                    order.trader_id(),
1879                    order.strategy_id(),
1880                    order.instrument_id(),
1881                    order.client_order_id(),
1882                    Ustr::from(
1883                        format!(
1884                            "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
1885                            order.order_type(),
1886                            order.order_side(),
1887                            trigger_price,
1888                            self.core
1889                                .bid
1890                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
1891                            self.core
1892                                .ask
1893                                .map_or_else(|| "None".to_string(), |p| p.to_string())
1894                        )
1895                        .as_str(),
1896                    ),
1897                    order.venue_order_id(),
1898                    order.account_id(),
1899                );
1900                return;
1901            }
1902        }
1903
1904        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
1905    }
1906
1907    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1908        let (new_trigger_price, new_price) = trailing_stop_calculate(
1909            self.instrument.price_increment(),
1910            order,
1911            self.core.bid,
1912            self.core.ask,
1913            self.core.last,
1914        )
1915        .unwrap();
1916
1917        if new_trigger_price.is_none() && new_price.is_none() {
1918            return;
1919        }
1920
1921        self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price);
1922    }
1923
1924    // -- EVENT HANDLING -----------------------------------------------------
1925
1926    fn accept_order(&mut self, order: &mut OrderAny) {
1927        if order.is_closed() {
1928            // Temporary guard to prevent invalid processing
1929            return;
1930        }
1931        if order.status() != OrderStatus::Accepted {
1932            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1933            self.generate_order_accepted(order, venue_order_id);
1934
1935            if matches!(
1936                order.order_type(),
1937                OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
1938            ) && order.trigger_price().is_none()
1939            {
1940                self.update_trailing_stop_order(order);
1941            }
1942        }
1943
1944        let _ = self.core.add_order(order.to_owned().into());
1945    }
1946
1947    fn expire_order(&mut self, order: &PassiveOrderAny) {
1948        if self.config.support_contingent_orders
1949            && order
1950                .contingency_type()
1951                .is_some_and(|c| c != ContingencyType::NoContingency)
1952        {
1953            self.cancel_contingent_orders(&OrderAny::from(order.clone()));
1954        }
1955
1956        self.generate_order_expired(&order.to_any());
1957    }
1958
1959    fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
1960        let cancel_contingencies = cancel_contingencies.unwrap_or(true);
1961        if order.is_active_local() {
1962            log::error!(
1963                "Cannot cancel an order with {} from the matching engine",
1964                order.status()
1965            );
1966            return;
1967        }
1968
1969        // Check if order exists in OrderMatching core, and delete it if it does
1970        if self.core.order_exists(order.client_order_id()) {
1971            let _ = self
1972                .core
1973                .delete_order(&PassiveOrderAny::from(order.clone()));
1974        }
1975        self.cached_filled_qty.remove(&order.client_order_id());
1976
1977        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1978        self.generate_order_canceled(order, venue_order_id);
1979
1980        if self.config.support_contingent_orders
1981            && order.contingency_type().is_some()
1982            && order.contingency_type().unwrap() != ContingencyType::NoContingency
1983            && cancel_contingencies
1984        {
1985            self.cancel_contingent_orders(order);
1986        }
1987    }
1988
1989    fn update_order(
1990        &mut self,
1991        order: &mut OrderAny,
1992        quantity: Option<Quantity>,
1993        price: Option<Price>,
1994        trigger_price: Option<Price>,
1995        update_contingencies: Option<bool>,
1996    ) {
1997        let update_contingencies = update_contingencies.unwrap_or(true);
1998        let quantity = quantity.unwrap_or(order.quantity());
1999
2000        match order {
2001            OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
2002                let price = price.unwrap_or(order.price().unwrap());
2003                self.update_limit_order(order, quantity, price);
2004            }
2005            OrderAny::StopMarket(_) => {
2006                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2007                self.update_stop_market_order(order, quantity, trigger_price);
2008            }
2009            OrderAny::StopLimit(_) => {
2010                let price = price.unwrap_or(order.price().unwrap());
2011                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2012                self.update_stop_limit_order(order, quantity, price, trigger_price);
2013            }
2014            OrderAny::MarketIfTouched(_) => {
2015                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2016                self.update_market_if_touched_order(order, quantity, trigger_price);
2017            }
2018            OrderAny::LimitIfTouched(_) => {
2019                let price = price.unwrap_or(order.price().unwrap());
2020                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2021                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2022            }
2023            OrderAny::TrailingStopMarket(_) => {
2024                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2025                self.update_market_if_touched_order(order, quantity, trigger_price);
2026            }
2027            OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
2028                let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
2029                let trigger_price =
2030                    trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
2031                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2032            }
2033            _ => {
2034                panic!(
2035                    "Unsupported order type {} for update_order",
2036                    order.order_type()
2037                );
2038            }
2039        }
2040
2041        if self.config.support_contingent_orders
2042            && order
2043                .contingency_type()
2044                .is_some_and(|c| c != ContingencyType::NoContingency)
2045            && update_contingencies
2046        {
2047            self.update_contingent_order(order);
2048        }
2049    }
2050
2051    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
2052        todo!("trigger_stop_order")
2053    }
2054
2055    fn update_contingent_order(&mut self, order: &OrderAny) {
2056        log::debug!("Updating OUO orders from {}", order.client_order_id());
2057        if let Some(linked_order_ids) = order.linked_order_ids() {
2058            for client_order_id in linked_order_ids {
2059                let mut child_order = match self.cache.borrow().order(client_order_id) {
2060                    Some(order) => order.clone(),
2061                    None => panic!("Order {client_order_id} not found in cache."),
2062                };
2063
2064                if child_order.is_active_local() {
2065                    continue;
2066                }
2067
2068                if order.leaves_qty().is_zero() {
2069                    self.cancel_order(&child_order, None);
2070                } else if child_order.leaves_qty() != order.leaves_qty() {
2071                    let price = child_order.price();
2072                    let trigger_price = child_order.trigger_price();
2073                    self.update_order(
2074                        &mut child_order,
2075                        Some(order.leaves_qty()),
2076                        price,
2077                        trigger_price,
2078                        Some(false),
2079                    );
2080                }
2081            }
2082        }
2083    }
2084
2085    fn cancel_contingent_orders(&mut self, order: &OrderAny) {
2086        if let Some(linked_order_ids) = order.linked_order_ids() {
2087            for client_order_id in linked_order_ids {
2088                let contingent_order = match self.cache.borrow().order(client_order_id) {
2089                    Some(order) => order.clone(),
2090                    None => panic!("Cannot find contingent order for {client_order_id}"),
2091                };
2092                if contingent_order.is_active_local() {
2093                    // order is not on the exchange yet
2094                    continue;
2095                }
2096                if !contingent_order.is_closed() {
2097                    self.cancel_order(&contingent_order, Some(false));
2098                }
2099            }
2100        }
2101    }
2102
2103    // -- EVENT GENERATORS -----------------------------------------------------
2104
2105    fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
2106        let ts_now = self.clock.borrow().timestamp_ns();
2107        let account_id = order
2108            .account_id()
2109            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2110
2111        let event = OrderEventAny::Rejected(OrderRejected::new(
2112            order.trader_id(),
2113            order.strategy_id(),
2114            order.instrument_id(),
2115            order.client_order_id(),
2116            account_id,
2117            reason,
2118            UUID4::new(),
2119            ts_now,
2120            ts_now,
2121            false,
2122        ));
2123        msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2124    }
2125
2126    fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
2127        let ts_now = self.clock.borrow().timestamp_ns();
2128        let account_id = order
2129            .account_id()
2130            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2131        let event = OrderEventAny::Accepted(OrderAccepted::new(
2132            order.trader_id(),
2133            order.strategy_id(),
2134            order.instrument_id(),
2135            order.client_order_id(),
2136            venue_order_id,
2137            account_id,
2138            UUID4::new(),
2139            ts_now,
2140            ts_now,
2141            false,
2142        ));
2143        msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2144
2145        // TODO remove this when execution engine msgbus handlers are correctly set
2146        order.apply(event).expect("Failed to apply order event");
2147    }
2148
2149    #[allow(clippy::too_many_arguments)]
2150    fn generate_order_modify_rejected(
2151        &self,
2152        trader_id: TraderId,
2153        strategy_id: StrategyId,
2154        instrument_id: InstrumentId,
2155        client_order_id: ClientOrderId,
2156        reason: Ustr,
2157        venue_order_id: Option<VenueOrderId>,
2158        account_id: Option<AccountId>,
2159    ) {
2160        let ts_now = self.clock.borrow().timestamp_ns();
2161        let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
2162            trader_id,
2163            strategy_id,
2164            instrument_id,
2165            client_order_id,
2166            reason,
2167            UUID4::new(),
2168            ts_now,
2169            ts_now,
2170            false,
2171            venue_order_id,
2172            account_id,
2173        ));
2174        msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2175    }
2176
2177    #[allow(clippy::too_many_arguments)]
2178    fn generate_order_cancel_rejected(
2179        &self,
2180        trader_id: TraderId,
2181        strategy_id: StrategyId,
2182        account_id: AccountId,
2183        instrument_id: InstrumentId,
2184        client_order_id: ClientOrderId,
2185        venue_order_id: VenueOrderId,
2186        reason: Ustr,
2187    ) {
2188        let ts_now = self.clock.borrow().timestamp_ns();
2189        let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
2190            trader_id,
2191            strategy_id,
2192            instrument_id,
2193            client_order_id,
2194            reason,
2195            UUID4::new(),
2196            ts_now,
2197            ts_now,
2198            false,
2199            Some(venue_order_id),
2200            Some(account_id),
2201        ));
2202        msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2203    }
2204
2205    fn generate_order_updated(
2206        &self,
2207        order: &mut OrderAny,
2208        quantity: Quantity,
2209        price: Option<Price>,
2210        trigger_price: Option<Price>,
2211    ) {
2212        let ts_now = self.clock.borrow().timestamp_ns();
2213        let event = OrderEventAny::Updated(OrderUpdated::new(
2214            order.trader_id(),
2215            order.strategy_id(),
2216            order.instrument_id(),
2217            order.client_order_id(),
2218            quantity,
2219            UUID4::new(),
2220            ts_now,
2221            ts_now,
2222            false,
2223            order.venue_order_id(),
2224            order.account_id(),
2225            price,
2226            trigger_price,
2227        ));
2228        msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2229
2230        // TODO remove this when execution engine msgbus handlers are correctly set
2231        order.apply(event).expect("Failed to apply order event");
2232    }
2233
2234    fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
2235        let ts_now = self.clock.borrow().timestamp_ns();
2236        let event = OrderEventAny::Canceled(OrderCanceled::new(
2237            order.trader_id(),
2238            order.strategy_id(),
2239            order.instrument_id(),
2240            order.client_order_id(),
2241            UUID4::new(),
2242            ts_now,
2243            ts_now,
2244            false,
2245            Some(venue_order_id),
2246            order.account_id(),
2247        ));
2248        msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2249    }
2250
2251    fn generate_order_triggered(&self, order: &OrderAny) {
2252        let ts_now = self.clock.borrow().timestamp_ns();
2253        let event = OrderEventAny::Triggered(OrderTriggered::new(
2254            order.trader_id(),
2255            order.strategy_id(),
2256            order.instrument_id(),
2257            order.client_order_id(),
2258            UUID4::new(),
2259            ts_now,
2260            ts_now,
2261            false,
2262            order.venue_order_id(),
2263            order.account_id(),
2264        ));
2265        msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2266    }
2267
2268    fn generate_order_expired(&self, order: &OrderAny) {
2269        let ts_now = self.clock.borrow().timestamp_ns();
2270        let event = OrderEventAny::Expired(OrderExpired::new(
2271            order.trader_id(),
2272            order.strategy_id(),
2273            order.instrument_id(),
2274            order.client_order_id(),
2275            UUID4::new(),
2276            ts_now,
2277            ts_now,
2278            false,
2279            order.venue_order_id(),
2280            order.account_id(),
2281        ));
2282        msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2283    }
2284
2285    #[allow(clippy::too_many_arguments)]
2286    fn generate_order_filled(
2287        &mut self,
2288        order: &mut OrderAny,
2289        venue_order_id: VenueOrderId,
2290        venue_position_id: Option<PositionId>,
2291        last_qty: Quantity,
2292        last_px: Price,
2293        quote_currency: Currency,
2294        commission: Money,
2295        liquidity_side: LiquiditySide,
2296    ) {
2297        let ts_now = self.clock.borrow().timestamp_ns();
2298        let account_id = order
2299            .account_id()
2300            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2301        let event = OrderEventAny::Filled(OrderFilled::new(
2302            order.trader_id(),
2303            order.strategy_id(),
2304            order.instrument_id(),
2305            order.client_order_id(),
2306            venue_order_id,
2307            account_id,
2308            self.ids_generator.generate_trade_id(),
2309            order.order_side(),
2310            order.order_type(),
2311            last_qty,
2312            last_px,
2313            quote_currency,
2314            liquidity_side,
2315            UUID4::new(),
2316            ts_now,
2317            ts_now,
2318            false,
2319            venue_position_id,
2320            Some(commission),
2321        ));
2322        msgbus::send(&Ustr::from("ExecEngine.process"), &event as &dyn Any);
2323
2324        // TODO remove this when execution engine msgbus handlers are correctly set
2325        order.apply(event).expect("Failed to apply order event");
2326    }
2327}