nautilus_backtest/
exchange.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `SimulatedExchange` venue for backtesting on historical data.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23    cell::RefCell,
24    collections::{BinaryHeap, VecDeque},
25    fmt::Debug,
26    rc::Rc,
27};
28
29use ahash::AHashMap;
30use nautilus_common::{
31    cache::Cache, clients::ExecutionClient, clock::Clock, messages::execution::TradingCommand,
32};
33use nautilus_core::{
34    UnixNanos,
35    correctness::{FAILED, check_equal},
36};
37use nautilus_execution::{
38    matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
39    models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
40};
41use nautilus_model::{
42    accounts::AccountAny,
43    data::{
44        Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
45        QuoteTick, TradeTick,
46    },
47    enums::{AccountType, BookType, OmsType},
48    identifiers::{InstrumentId, Venue},
49    instruments::{Instrument, InstrumentAny},
50    orderbook::OrderBook,
51    orders::PassiveOrderAny,
52    types::{AccountBalance, Currency, Money, Price},
53};
54use rust_decimal::Decimal;
55
56use crate::modules::SimulationModule;
57
58/// Represents commands with simulated network latency in a min-heap priority queue.
59/// The commands are ordered by timestamp for FIFO processing, with the
60/// earliest timestamp having the highest priority in the queue.
61#[derive(Debug, Eq, PartialEq)]
62struct InflightCommand {
63    timestamp: UnixNanos,
64    counter: u32,
65    command: TradingCommand,
66}
67
68impl InflightCommand {
69    const fn new(timestamp: UnixNanos, counter: u32, command: TradingCommand) -> Self {
70        Self {
71            timestamp,
72            counter,
73            command,
74        }
75    }
76}
77
78impl Ord for InflightCommand {
79    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
80        // Reverse ordering for min-heap (earliest timestamp first then lowest counter)
81        other
82            .timestamp
83            .cmp(&self.timestamp)
84            .then_with(|| other.counter.cmp(&self.counter))
85    }
86}
87
88impl PartialOrd for InflightCommand {
89    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
90        Some(self.cmp(other))
91    }
92}
93
94/// Simulated exchange venue for realistic trading execution during backtesting.
95///
96/// The `SimulatedExchange` provides a comprehensive simulation of a trading venue,
97/// including order matching engines, account management, and realistic execution
98/// models. It maintains order books, processes market data, and executes trades
99/// with configurable latency and fill models to accurately simulate real market
100/// conditions during backtesting.
101///
102/// Key features:
103/// - Multi-instrument order matching with realistic execution
104/// - Configurable fee, fill, and latency models
105/// - Support for various order types and execution options
106/// - Account balance and position management
107/// - Market data processing and order book maintenance
108/// - Simulation modules for custom venue behaviors
109pub struct SimulatedExchange {
110    pub id: Venue,
111    pub oms_type: OmsType,
112    pub account_type: AccountType,
113    starting_balances: Vec<Money>,
114    book_type: BookType,
115    default_leverage: Decimal,
116    exec_client: Option<Rc<dyn ExecutionClient>>,
117    pub base_currency: Option<Currency>,
118    fee_model: FeeModelAny,
119    fill_model: FillModel,
120    latency_model: Option<Box<dyn LatencyModel>>,
121    instruments: AHashMap<InstrumentId, InstrumentAny>,
122    matching_engines: AHashMap<InstrumentId, OrderMatchingEngine>,
123    leverages: AHashMap<InstrumentId, Decimal>,
124    modules: Vec<Box<dyn SimulationModule>>,
125    clock: Rc<RefCell<dyn Clock>>,
126    cache: Rc<RefCell<Cache>>,
127    message_queue: VecDeque<TradingCommand>,
128    inflight_queue: BinaryHeap<InflightCommand>,
129    inflight_counter: AHashMap<UnixNanos, u32>,
130    bar_execution: bool,
131    trade_execution: bool,
132    liquidity_consumption: bool,
133    reject_stop_orders: bool,
134    support_gtd_orders: bool,
135    support_contingent_orders: bool,
136    use_position_ids: bool,
137    use_random_ids: bool,
138    use_reduce_only: bool,
139    use_message_queue: bool,
140    allow_cash_borrowing: bool,
141    frozen_account: bool,
142    price_protection_points: u32,
143}
144
145impl Debug for SimulatedExchange {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.debug_struct(stringify!(SimulatedExchange))
148            .field("id", &self.id)
149            .field("account_type", &self.account_type)
150            .finish()
151    }
152}
153
154impl SimulatedExchange {
155    /// Creates a new [`SimulatedExchange`] instance.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if:
160    /// - `starting_balances` is empty.
161    /// - `base_currency` is `Some` but `starting_balances` contains multiple currencies.
162    #[allow(clippy::too_many_arguments)]
163    pub fn new(
164        venue: Venue,
165        oms_type: OmsType,
166        account_type: AccountType,
167        starting_balances: Vec<Money>,
168        base_currency: Option<Currency>,
169        default_leverage: Decimal,
170        leverages: AHashMap<InstrumentId, Decimal>,
171        modules: Vec<Box<dyn SimulationModule>>,
172        cache: Rc<RefCell<Cache>>,
173        clock: Rc<RefCell<dyn Clock>>,
174        fill_model: FillModel,
175        fee_model: FeeModelAny,
176        book_type: BookType,
177        latency_model: Option<Box<dyn LatencyModel>>,
178        bar_execution: Option<bool>,
179        trade_execution: Option<bool>,
180        liquidity_consumption: Option<bool>,
181        reject_stop_orders: Option<bool>,
182        support_gtd_orders: Option<bool>,
183        support_contingent_orders: Option<bool>,
184        use_position_ids: Option<bool>,
185        use_random_ids: Option<bool>,
186        use_reduce_only: Option<bool>,
187        use_message_queue: Option<bool>,
188        allow_cash_borrowing: Option<bool>,
189        frozen_account: Option<bool>,
190        price_protection_points: Option<u32>,
191    ) -> anyhow::Result<Self> {
192        if starting_balances.is_empty() {
193            anyhow::bail!("Starting balances must be provided")
194        }
195        if base_currency.is_some() && starting_balances.len() > 1 {
196            anyhow::bail!("single-currency account has multiple starting currencies")
197        }
198        // TODO register and load modules
199        Ok(Self {
200            id: venue,
201            oms_type,
202            account_type,
203            starting_balances,
204            book_type,
205            default_leverage,
206            exec_client: None,
207            base_currency,
208            fee_model,
209            fill_model,
210            latency_model,
211            instruments: AHashMap::new(),
212            matching_engines: AHashMap::new(),
213            leverages,
214            modules,
215            clock,
216            cache,
217            message_queue: VecDeque::new(),
218            inflight_queue: BinaryHeap::new(),
219            inflight_counter: AHashMap::new(),
220            bar_execution: bar_execution.unwrap_or(true),
221            trade_execution: trade_execution.unwrap_or(true),
222            liquidity_consumption: liquidity_consumption.unwrap_or(true),
223            reject_stop_orders: reject_stop_orders.unwrap_or(true),
224            support_gtd_orders: support_gtd_orders.unwrap_or(true),
225            support_contingent_orders: support_contingent_orders.unwrap_or(true),
226            use_position_ids: use_position_ids.unwrap_or(true),
227            use_random_ids: use_random_ids.unwrap_or(false),
228            use_reduce_only: use_reduce_only.unwrap_or(true),
229            use_message_queue: use_message_queue.unwrap_or(true),
230            allow_cash_borrowing: allow_cash_borrowing.unwrap_or(false),
231            frozen_account: frozen_account.unwrap_or(false),
232            price_protection_points: price_protection_points.unwrap_or(0),
233        })
234    }
235
236    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
237        self.exec_client = Some(client);
238    }
239
240    pub fn set_fill_model(&mut self, fill_model: FillModel) {
241        for matching_engine in self.matching_engines.values_mut() {
242            matching_engine.set_fill_model(fill_model.clone());
243            log::info!(
244                "Setting fill model for {} to {}",
245                matching_engine.venue,
246                self.fill_model
247            );
248        }
249        self.fill_model = fill_model;
250    }
251
252    pub fn set_latency_model(&mut self, latency_model: Box<dyn LatencyModel>) {
253        self.latency_model = Some(latency_model);
254    }
255
256    pub fn initialize_account(&mut self) {
257        self.generate_fresh_account_state();
258    }
259
260    /// Adds an instrument to the simulated exchange and initializes its matching engine.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if the exchange account type is `Cash` and the instrument is a `CryptoPerpetual` or `CryptoFuture`.
265    ///
266    /// # Panics
267    ///
268    /// Panics if the instrument cannot be added to the exchange.
269    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
270        check_equal(
271            &instrument.id().venue,
272            &self.id,
273            "Venue of instrument id",
274            "Venue of simulated exchange",
275        )
276        .expect(FAILED);
277
278        if self.account_type == AccountType::Cash
279            && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
280                || matches!(instrument, InstrumentAny::CryptoFuture(_)))
281        {
282            anyhow::bail!("Cash account cannot trade futures or perpetuals")
283        }
284
285        self.instruments.insert(instrument.id(), instrument.clone());
286
287        let price_protection = if self.price_protection_points == 0 {
288            None
289        } else {
290            Some(self.price_protection_points)
291        };
292
293        let matching_engine_config = OrderMatchingEngineConfig::new(
294            self.bar_execution,
295            self.trade_execution,
296            self.liquidity_consumption,
297            self.reject_stop_orders,
298            self.support_gtd_orders,
299            self.support_contingent_orders,
300            self.use_position_ids,
301            self.use_random_ids,
302            self.use_reduce_only,
303        )
304        .with_price_protection_points(price_protection);
305        let instrument_id = instrument.id();
306        let matching_engine = OrderMatchingEngine::new(
307            instrument,
308            self.instruments.len() as u32,
309            self.fill_model.clone(),
310            self.fee_model.clone(),
311            self.book_type,
312            self.oms_type,
313            self.account_type,
314            self.clock.clone(),
315            Rc::clone(&self.cache),
316            matching_engine_config,
317        );
318        self.matching_engines.insert(instrument_id, matching_engine);
319
320        log::info!("Added instrument {instrument_id} and created matching engine");
321        Ok(())
322    }
323
324    #[must_use]
325    pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
326        self.matching_engines
327            .get(&instrument_id)
328            .and_then(OrderMatchingEngine::best_bid_price)
329    }
330
331    #[must_use]
332    pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
333        self.matching_engines
334            .get(&instrument_id)
335            .and_then(OrderMatchingEngine::best_ask_price)
336    }
337
338    pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
339        self.matching_engines
340            .get(&instrument_id)
341            .map(OrderMatchingEngine::get_book)
342    }
343
344    #[must_use]
345    pub fn get_matching_engine(
346        &self,
347        instrument_id: &InstrumentId,
348    ) -> Option<&OrderMatchingEngine> {
349        self.matching_engines.get(instrument_id)
350    }
351
352    #[must_use]
353    pub const fn get_matching_engines(&self) -> &AHashMap<InstrumentId, OrderMatchingEngine> {
354        &self.matching_engines
355    }
356
357    #[must_use]
358    pub fn get_books(&self) -> AHashMap<InstrumentId, OrderBook> {
359        let mut books = AHashMap::new();
360        for (instrument_id, matching_engine) in &self.matching_engines {
361            books.insert(*instrument_id, matching_engine.get_book().clone());
362        }
363        books
364    }
365
366    #[must_use]
367    pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
368        instrument_id
369            .and_then(|id| {
370                self.matching_engines
371                    .get(&id)
372                    .map(OrderMatchingEngine::get_open_orders)
373            })
374            .unwrap_or_else(|| {
375                self.matching_engines
376                    .values()
377                    .flat_map(OrderMatchingEngine::get_open_orders)
378                    .collect()
379            })
380    }
381
382    #[must_use]
383    pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
384        instrument_id
385            .and_then(|id| {
386                self.matching_engines
387                    .get(&id)
388                    .map(|engine| engine.get_open_bid_orders().to_vec())
389            })
390            .unwrap_or_else(|| {
391                self.matching_engines
392                    .values()
393                    .flat_map(|engine| engine.get_open_bid_orders().to_vec())
394                    .collect()
395            })
396    }
397
398    #[must_use]
399    pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
400        instrument_id
401            .and_then(|id| {
402                self.matching_engines
403                    .get(&id)
404                    .map(|engine| engine.get_open_ask_orders().to_vec())
405            })
406            .unwrap_or_else(|| {
407                self.matching_engines
408                    .values()
409                    .flat_map(|engine| engine.get_open_ask_orders().to_vec())
410                    .collect()
411            })
412    }
413
414    /// # Panics
415    ///
416    /// Panics if retrieving the account from the execution client fails.
417    #[must_use]
418    pub fn get_account(&self) -> Option<AccountAny> {
419        self.exec_client
420            .as_ref()
421            .map(|client| client.get_account().unwrap())
422    }
423
424    /// # Panics
425    ///
426    /// Panics if generating account state fails during adjustment.
427    pub fn adjust_account(&mut self, adjustment: Money) {
428        if self.frozen_account {
429            // Nothing to adjust
430            return;
431        }
432
433        if let Some(exec_client) = &self.exec_client {
434            let venue = exec_client.venue();
435            println!("Adjusting account for venue {venue}");
436            if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
437                match account.balance(Some(adjustment.currency)) {
438                    Some(balance) => {
439                        let mut current_balance = *balance;
440                        current_balance.total += adjustment;
441                        current_balance.free += adjustment;
442
443                        let margins = match account {
444                            AccountAny::Margin(margin_account) => margin_account.margins.clone(),
445                            _ => AHashMap::new(),
446                        };
447
448                        if let Some(exec_client) = &self.exec_client {
449                            exec_client
450                                .generate_account_state(
451                                    vec![current_balance],
452                                    margins.values().copied().collect(),
453                                    true,
454                                    self.clock.borrow().timestamp_ns(),
455                                )
456                                .unwrap();
457                        }
458                    }
459                    None => {
460                        log::error!(
461                            "Cannot adjust account: no balance for currency {}",
462                            adjustment.currency
463                        );
464                    }
465                }
466            } else {
467                log::error!("Cannot adjust account: no account for venue {venue}");
468            }
469        }
470    }
471
472    pub fn send(&mut self, command: TradingCommand) {
473        if !self.use_message_queue {
474            self.process_trading_command(command);
475        } else if self.latency_model.is_none() {
476            self.message_queue.push_back(command);
477        } else {
478            let (timestamp, counter) = self.generate_inflight_command(&command);
479            self.inflight_queue
480                .push(InflightCommand::new(timestamp, counter, command));
481        }
482    }
483
484    /// # Panics
485    ///
486    /// Panics if the command is invalid when generating inflight command.
487    pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
488        if let Some(latency_model) = &self.latency_model {
489            let ts = match command {
490                TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
491                    command.ts_init() + latency_model.get_insert_latency()
492                }
493                TradingCommand::ModifyOrder(_) => {
494                    command.ts_init() + latency_model.get_update_latency()
495                }
496                TradingCommand::CancelOrder(_)
497                | TradingCommand::CancelAllOrders(_)
498                | TradingCommand::BatchCancelOrders(_) => {
499                    command.ts_init() + latency_model.get_delete_latency()
500                }
501                _ => panic!("Cannot handle command: {command:?}"),
502            };
503
504            let counter = self
505                .inflight_counter
506                .entry(ts)
507                .and_modify(|e| *e += 1)
508                .or_insert(1);
509
510            (ts, *counter)
511        } else {
512            panic!("Latency model should be initialized");
513        }
514    }
515
516    /// # Panics
517    ///
518    /// Panics if adding a missing instrument during delta processing fails.
519    pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
520        for module in &self.modules {
521            module.pre_process(Data::Delta(delta));
522        }
523
524        if !self.matching_engines.contains_key(&delta.instrument_id) {
525            let instrument = {
526                let cache = self.cache.as_ref().borrow();
527                cache.instrument(&delta.instrument_id).cloned()
528            };
529
530            if let Some(instrument) = instrument {
531                self.add_instrument(instrument).unwrap();
532            } else {
533                panic!(
534                    "No matching engine found for instrument {}",
535                    delta.instrument_id
536                );
537            }
538        }
539
540        if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
541            matching_engine.process_order_book_delta(&delta).unwrap();
542        } else {
543            panic!("Matching engine should be initialized");
544        }
545    }
546
547    /// # Panics
548    ///
549    /// Panics if adding a missing instrument during deltas processing fails.
550    pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
551        for module in &self.modules {
552            module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
553        }
554
555        if !self.matching_engines.contains_key(&deltas.instrument_id) {
556            let instrument = {
557                let cache = self.cache.as_ref().borrow();
558                cache.instrument(&deltas.instrument_id).cloned()
559            };
560
561            if let Some(instrument) = instrument {
562                self.add_instrument(instrument).unwrap();
563            } else {
564                panic!(
565                    "No matching engine found for instrument {}",
566                    deltas.instrument_id
567                );
568            }
569        }
570
571        if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
572            matching_engine.process_order_book_deltas(&deltas).unwrap();
573        } else {
574            panic!("Matching engine should be initialized");
575        }
576    }
577
578    /// # Panics
579    ///
580    /// Panics if adding a missing instrument during quote tick processing fails.
581    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
582        for module in &self.modules {
583            module.pre_process(Data::Quote(quote.to_owned()));
584        }
585
586        if !self.matching_engines.contains_key(&quote.instrument_id) {
587            let instrument = {
588                let cache = self.cache.as_ref().borrow();
589                cache.instrument(&quote.instrument_id).cloned()
590            };
591
592            if let Some(instrument) = instrument {
593                self.add_instrument(instrument).unwrap();
594            } else {
595                panic!(
596                    "No matching engine found for instrument {}",
597                    quote.instrument_id
598                );
599            }
600        }
601
602        if let Some(matching_engine) = self.matching_engines.get_mut(&quote.instrument_id) {
603            matching_engine.process_quote_tick(quote);
604        } else {
605            panic!("Matching engine should be initialized");
606        }
607    }
608
609    /// # Panics
610    ///
611    /// Panics if adding a missing instrument during trade tick processing fails.
612    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
613        for module in &self.modules {
614            module.pre_process(Data::Trade(trade.to_owned()));
615        }
616
617        if !self.matching_engines.contains_key(&trade.instrument_id) {
618            let instrument = {
619                let cache = self.cache.as_ref().borrow();
620                cache.instrument(&trade.instrument_id).cloned()
621            };
622
623            if let Some(instrument) = instrument {
624                self.add_instrument(instrument).unwrap();
625            } else {
626                panic!(
627                    "No matching engine found for instrument {}",
628                    trade.instrument_id
629                );
630            }
631        }
632
633        if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
634            matching_engine.process_trade_tick(trade);
635        } else {
636            panic!("Matching engine should be initialized");
637        }
638    }
639
640    /// # Panics
641    ///
642    /// Panics if adding a missing instrument during bar processing fails.
643    pub fn process_bar(&mut self, bar: Bar) {
644        for module in &self.modules {
645            module.pre_process(Data::Bar(bar));
646        }
647
648        if !self.matching_engines.contains_key(&bar.instrument_id()) {
649            let instrument = {
650                let cache = self.cache.as_ref().borrow();
651                cache.instrument(&bar.instrument_id()).cloned()
652            };
653
654            if let Some(instrument) = instrument {
655                self.add_instrument(instrument).unwrap();
656            } else {
657                panic!(
658                    "No matching engine found for instrument {}",
659                    bar.instrument_id()
660                );
661            }
662        }
663
664        if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
665            matching_engine.process_bar(&bar);
666        } else {
667            panic!("Matching engine should be initialized");
668        }
669    }
670
671    /// # Panics
672    ///
673    /// Panics if adding a missing instrument during instrument status processing fails.
674    pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
675        // TODO add module preprocessing
676
677        if !self.matching_engines.contains_key(&status.instrument_id) {
678            let instrument = {
679                let cache = self.cache.as_ref().borrow();
680                cache.instrument(&status.instrument_id).cloned()
681            };
682
683            if let Some(instrument) = instrument {
684                self.add_instrument(instrument).unwrap();
685            } else {
686                panic!(
687                    "No matching engine found for instrument {}",
688                    status.instrument_id
689                );
690            }
691        }
692
693        if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
694            matching_engine.process_status(status.action);
695        } else {
696            panic!("Matching engine should be initialized");
697        }
698    }
699
700    /// # Panics
701    ///
702    /// Panics if popping an inflight command fails during processing.
703    pub fn process(&mut self, ts_now: UnixNanos) {
704        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
705
706        // Process inflight commands
707        while let Some(inflight) = self.inflight_queue.peek() {
708            if inflight.timestamp > ts_now {
709                // Future commands remain in the queue
710                break;
711            }
712            // We get the inflight command, remove it from the queue and process it
713            let inflight = self.inflight_queue.pop().unwrap();
714            self.process_trading_command(inflight.command);
715        }
716
717        // Process regular message queue
718        while let Some(command) = self.message_queue.pop_front() {
719            self.process_trading_command(command);
720        }
721    }
722
723    pub fn reset(&mut self) {
724        for module in &self.modules {
725            module.reset();
726        }
727
728        self.generate_fresh_account_state();
729
730        for matching_engine in self.matching_engines.values_mut() {
731            matching_engine.reset();
732        }
733
734        // TODO Clear the inflight and message queues
735        log::info!("Resetting exchange state");
736    }
737
738    /// # Panics
739    ///
740    /// Panics if execution client is uninitialized when processing trading command.
741    pub fn process_trading_command(&mut self, command: TradingCommand) {
742        if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
743            let account_id = if let Some(exec_client) = &self.exec_client {
744                exec_client.account_id()
745            } else {
746                panic!("Execution client should be initialized");
747            };
748            match command {
749                TradingCommand::SubmitOrder(mut command) => {
750                    matching_engine.process_order(&mut command.order, account_id);
751                }
752                TradingCommand::ModifyOrder(ref command) => {
753                    matching_engine.process_modify(command, account_id);
754                }
755                TradingCommand::CancelOrder(ref command) => {
756                    matching_engine.process_cancel(command, account_id);
757                }
758                TradingCommand::CancelAllOrders(ref command) => {
759                    matching_engine.process_cancel_all(command, account_id);
760                }
761                TradingCommand::BatchCancelOrders(ref command) => {
762                    matching_engine.process_batch_cancel(command, account_id);
763                }
764                TradingCommand::SubmitOrderList(mut command) => {
765                    for order in &mut command.order_list.orders {
766                        matching_engine.process_order(order, account_id);
767                    }
768                }
769                _ => {}
770            }
771        } else {
772            panic!(
773                "Matching engine not found for instrument {}",
774                command.instrument_id()
775            );
776        }
777    }
778
779    /// # Panics
780    ///
781    /// Panics if generating fresh account state fails.
782    pub fn generate_fresh_account_state(&self) {
783        let balances: Vec<AccountBalance> = self
784            .starting_balances
785            .iter()
786            .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
787            .collect();
788
789        if let Some(exec_client) = &self.exec_client {
790            exec_client
791                .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
792                .unwrap();
793        }
794
795        // Set leverages
796        if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
797            margin_account.set_default_leverage(self.default_leverage);
798
799            // Set instrument specific leverages
800            for (instrument_id, leverage) in &self.leverages {
801                margin_account.set_leverage(*instrument_id, *leverage);
802            }
803        }
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use std::{cell::RefCell, collections::BinaryHeap, rc::Rc};
810
811    use ahash::AHashMap;
812    use nautilus_common::{
813        cache::Cache,
814        clock::TestClock,
815        messages::execution::{SubmitOrder, TradingCommand},
816        msgbus::{
817            self,
818            stubs::{get_message_saving_handler, get_saved_messages},
819        },
820    };
821    use nautilus_core::{UUID4, UnixNanos};
822    use nautilus_execution::models::{
823        fee::{FeeModelAny, MakerTakerFeeModel},
824        fill::FillModel,
825        latency::StaticLatencyModel,
826    };
827    use nautilus_model::{
828        accounts::{AccountAny, MarginAccount},
829        data::{
830            Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
831            TradeTick,
832        },
833        enums::{
834            AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
835            OmsType, OrderSide, OrderType,
836        },
837        events::AccountState,
838        identifiers::{AccountId, InstrumentId, StrategyId, TradeId, TraderId, Venue},
839        instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
840        orders::OrderTestBuilder,
841        stubs::TestDefault,
842        types::{AccountBalance, Currency, Money, Price, Quantity},
843    };
844    use rstest::rstest;
845
846    use crate::{
847        exchange::{InflightCommand, SimulatedExchange},
848        execution_client::BacktestExecutionClient,
849    };
850
851    fn get_exchange(
852        venue: Venue,
853        account_type: AccountType,
854        book_type: BookType,
855        cache: Option<Rc<RefCell<Cache>>>,
856    ) -> Rc<RefCell<SimulatedExchange>> {
857        let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
858        let clock = Rc::new(RefCell::new(TestClock::new()));
859        let exchange = Rc::new(RefCell::new(
860            SimulatedExchange::new(
861                venue,
862                OmsType::Netting,
863                account_type,
864                vec![Money::new(1000.0, Currency::USD())],
865                None,
866                1.into(),
867                AHashMap::new(),
868                vec![],
869                cache.clone(),
870                clock,
871                FillModel::default(),
872                FeeModelAny::MakerTaker(MakerTakerFeeModel),
873                book_type,
874                None, // latency_model
875                None, // bar_execution
876                None, // trade_execution
877                None, // liquidity_consumption
878                None, // reject_stop_orders
879                None, // support_gtd_orders
880                None, // support_contingent_orders
881                None, // use_position_ids
882                None, // use_random_ids
883                None, // use_reduce_only
884                None, // use_message_queue
885                None, // allow_cash_borrowing
886                None, // frozen_account
887                None, // price_protection_points
888            )
889            .unwrap(),
890        ));
891
892        let clock = TestClock::new();
893        let execution_client = BacktestExecutionClient::new(
894            TraderId::test_default(),
895            AccountId::test_default(),
896            exchange.clone(),
897            cache,
898            Rc::new(RefCell::new(clock)),
899            None,
900            None,
901        );
902        exchange
903            .borrow_mut()
904            .register_client(Rc::new(execution_client));
905
906        exchange
907    }
908
909    fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
910        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
911        let order = OrderTestBuilder::new(OrderType::Market)
912            .instrument_id(instrument_id)
913            .quantity(Quantity::from(1))
914            .build();
915        TradingCommand::SubmitOrder(SubmitOrder::new(
916            TraderId::test_default(),
917            None,
918            StrategyId::test_default(),
919            instrument_id,
920            order,
921            None,
922            None,
923            None, // params
924            UUID4::default(),
925            ts_init,
926        ))
927    }
928
929    #[rstest]
930    #[should_panic(
931        expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
932    )]
933    fn test_venue_mismatch_between_exchange_and_instrument(
934        crypto_perpetual_ethusdt: CryptoPerpetual,
935    ) {
936        let exchange = get_exchange(
937            Venue::new("SIM"),
938            AccountType::Margin,
939            BookType::L1_MBP,
940            None,
941        );
942        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
943        exchange.borrow_mut().add_instrument(instrument).unwrap();
944    }
945
946    #[rstest]
947    #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
948    fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
949        let exchange = get_exchange(
950            Venue::new("BINANCE"),
951            AccountType::Cash,
952            BookType::L1_MBP,
953            None,
954        );
955        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
956        exchange.borrow_mut().add_instrument(instrument).unwrap();
957    }
958
959    #[rstest]
960    fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
961        let exchange = get_exchange(
962            Venue::new("BINANCE"),
963            AccountType::Margin,
964            BookType::L1_MBP,
965            None,
966        );
967        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
968
969        // register instrument
970        exchange.borrow_mut().add_instrument(instrument).unwrap();
971
972        // process tick
973        let quote_tick = QuoteTick::new(
974            crypto_perpetual_ethusdt.id,
975            Price::from("1000.00"),
976            Price::from("1001.00"),
977            Quantity::from("1.000"),
978            Quantity::from("1.000"),
979            UnixNanos::default(),
980            UnixNanos::default(),
981        );
982        exchange.borrow_mut().process_quote_tick(&quote_tick);
983
984        let best_bid_price = exchange
985            .borrow()
986            .best_bid_price(crypto_perpetual_ethusdt.id);
987        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
988        let best_ask_price = exchange
989            .borrow()
990            .best_ask_price(crypto_perpetual_ethusdt.id);
991        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
992    }
993
994    #[rstest]
995    fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
996        let exchange = get_exchange(
997            Venue::new("BINANCE"),
998            AccountType::Margin,
999            BookType::L1_MBP,
1000            None,
1001        );
1002        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1003
1004        // register instrument
1005        exchange.borrow_mut().add_instrument(instrument).unwrap();
1006
1007        // process tick
1008        let trade_tick = TradeTick::new(
1009            crypto_perpetual_ethusdt.id,
1010            Price::from("1000.00"),
1011            Quantity::from("1.000"),
1012            AggressorSide::Buyer,
1013            TradeId::from("1"),
1014            UnixNanos::default(),
1015            UnixNanos::default(),
1016        );
1017        exchange.borrow_mut().process_trade_tick(&trade_tick);
1018
1019        let best_bid_price = exchange
1020            .borrow()
1021            .best_bid_price(crypto_perpetual_ethusdt.id);
1022        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1023        let best_ask = exchange
1024            .borrow()
1025            .best_ask_price(crypto_perpetual_ethusdt.id);
1026        assert_eq!(best_ask, Some(Price::from("1000.00")));
1027    }
1028
1029    #[rstest]
1030    fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1031        let exchange = get_exchange(
1032            Venue::new("BINANCE"),
1033            AccountType::Margin,
1034            BookType::L1_MBP,
1035            None,
1036        );
1037        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1038
1039        // register instrument
1040        exchange.borrow_mut().add_instrument(instrument).unwrap();
1041
1042        // process bar
1043        let bar = Bar::new(
1044            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1045            Price::from("1500.00"),
1046            Price::from("1505.00"),
1047            Price::from("1490.00"),
1048            Price::from("1502.00"),
1049            Quantity::from("100.000"),
1050            UnixNanos::default(),
1051            UnixNanos::default(),
1052        );
1053        exchange.borrow_mut().process_bar(bar);
1054
1055        // this will be processed as ticks so both bid and ask will be the same as close of the bar
1056        let best_bid_price = exchange
1057            .borrow()
1058            .best_bid_price(crypto_perpetual_ethusdt.id);
1059        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1060        let best_ask_price = exchange
1061            .borrow()
1062            .best_ask_price(crypto_perpetual_ethusdt.id);
1063        assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1064    }
1065
1066    #[rstest]
1067    fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1068        let exchange = get_exchange(
1069            Venue::new("BINANCE"),
1070            AccountType::Margin,
1071            BookType::L1_MBP,
1072            None,
1073        );
1074        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1075
1076        // register instrument
1077        exchange.borrow_mut().add_instrument(instrument).unwrap();
1078
1079        // create both bid and ask based bars
1080        // add +1 on ask to make sure it is different from bid
1081        let bar_bid = Bar::new(
1082            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1083            Price::from("1500.00"),
1084            Price::from("1505.00"),
1085            Price::from("1490.00"),
1086            Price::from("1502.00"),
1087            Quantity::from("100.000"),
1088            UnixNanos::from(1),
1089            UnixNanos::from(1),
1090        );
1091        let bar_ask = Bar::new(
1092            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1093            Price::from("1501.00"),
1094            Price::from("1506.00"),
1095            Price::from("1491.00"),
1096            Price::from("1503.00"),
1097            Quantity::from("100.000"),
1098            UnixNanos::from(1),
1099            UnixNanos::from(1),
1100        );
1101
1102        // process them
1103        exchange.borrow_mut().process_bar(bar_bid);
1104        exchange.borrow_mut().process_bar(bar_ask);
1105
1106        // current bid and ask prices will be the corresponding close of the ask and bid bar
1107        let best_bid_price = exchange
1108            .borrow()
1109            .best_bid_price(crypto_perpetual_ethusdt.id);
1110        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1111        let best_ask_price = exchange
1112            .borrow()
1113            .best_ask_price(crypto_perpetual_ethusdt.id);
1114        assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1115    }
1116
1117    #[rstest]
1118    fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1119        let exchange = get_exchange(
1120            Venue::new("BINANCE"),
1121            AccountType::Margin,
1122            BookType::L2_MBP,
1123            None,
1124        );
1125        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1126
1127        // register instrument
1128        exchange.borrow_mut().add_instrument(instrument).unwrap();
1129
1130        // create order book delta at both bid and ask with incremented ts init and sequence
1131        let delta_buy = OrderBookDelta::new(
1132            crypto_perpetual_ethusdt.id,
1133            BookAction::Add,
1134            BookOrder::new(
1135                OrderSide::Buy,
1136                Price::from("1000.00"),
1137                Quantity::from("1.000"),
1138                1,
1139            ),
1140            0,
1141            0,
1142            UnixNanos::from(1),
1143            UnixNanos::from(1),
1144        );
1145        let delta_sell = OrderBookDelta::new(
1146            crypto_perpetual_ethusdt.id,
1147            BookAction::Add,
1148            BookOrder::new(
1149                OrderSide::Sell,
1150                Price::from("1001.00"),
1151                Quantity::from("1.000"),
1152                1,
1153            ),
1154            0,
1155            1,
1156            UnixNanos::from(2),
1157            UnixNanos::from(2),
1158        );
1159
1160        // process both deltas
1161        exchange.borrow_mut().process_order_book_delta(delta_buy);
1162        exchange.borrow_mut().process_order_book_delta(delta_sell);
1163
1164        let book = exchange
1165            .borrow()
1166            .get_book(crypto_perpetual_ethusdt.id)
1167            .unwrap()
1168            .clone();
1169        assert_eq!(book.update_count, 2);
1170        assert_eq!(book.sequence, 1);
1171        assert_eq!(book.ts_last, UnixNanos::from(2));
1172        let best_bid_price = exchange
1173            .borrow()
1174            .best_bid_price(crypto_perpetual_ethusdt.id);
1175        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1176        let best_ask_price = exchange
1177            .borrow()
1178            .best_ask_price(crypto_perpetual_ethusdt.id);
1179        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1180    }
1181
1182    #[rstest]
1183    fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1184        let exchange = get_exchange(
1185            Venue::new("BINANCE"),
1186            AccountType::Margin,
1187            BookType::L2_MBP,
1188            None,
1189        );
1190        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1191
1192        // register instrument
1193        exchange.borrow_mut().add_instrument(instrument).unwrap();
1194
1195        // create two sell order book deltas with same timestamps and higher sequence
1196        let delta_sell_1 = OrderBookDelta::new(
1197            crypto_perpetual_ethusdt.id,
1198            BookAction::Add,
1199            BookOrder::new(
1200                OrderSide::Sell,
1201                Price::from("1000.00"),
1202                Quantity::from("3.000"),
1203                1,
1204            ),
1205            0,
1206            0,
1207            UnixNanos::from(1),
1208            UnixNanos::from(1),
1209        );
1210        let delta_sell_2 = OrderBookDelta::new(
1211            crypto_perpetual_ethusdt.id,
1212            BookAction::Add,
1213            BookOrder::new(
1214                OrderSide::Sell,
1215                Price::from("1001.00"),
1216                Quantity::from("1.000"),
1217                1,
1218            ),
1219            0,
1220            1,
1221            UnixNanos::from(1),
1222            UnixNanos::from(1),
1223        );
1224        let orderbook_deltas = OrderBookDeltas::new(
1225            crypto_perpetual_ethusdt.id,
1226            vec![delta_sell_1, delta_sell_2],
1227        );
1228
1229        // process both deltas
1230        exchange
1231            .borrow_mut()
1232            .process_order_book_deltas(orderbook_deltas);
1233
1234        let book = exchange
1235            .borrow()
1236            .get_book(crypto_perpetual_ethusdt.id)
1237            .unwrap()
1238            .clone();
1239        assert_eq!(book.update_count, 2);
1240        assert_eq!(book.sequence, 1);
1241        assert_eq!(book.ts_last, UnixNanos::from(1));
1242        let best_bid_price = exchange
1243            .borrow()
1244            .best_bid_price(crypto_perpetual_ethusdt.id);
1245        // no bid orders in orderbook deltas
1246        assert_eq!(best_bid_price, None);
1247        let best_ask_price = exchange
1248            .borrow()
1249            .best_ask_price(crypto_perpetual_ethusdt.id);
1250        // best ask price is the first order in orderbook deltas
1251        assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1252    }
1253
1254    #[rstest]
1255    fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1256        let exchange = get_exchange(
1257            Venue::new("BINANCE"),
1258            AccountType::Margin,
1259            BookType::L2_MBP,
1260            None,
1261        );
1262        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1263
1264        // register instrument
1265        exchange.borrow_mut().add_instrument(instrument).unwrap();
1266
1267        let instrument_status = InstrumentStatus::new(
1268            crypto_perpetual_ethusdt.id,
1269            MarketStatusAction::Close, // close the market
1270            UnixNanos::from(1),
1271            UnixNanos::from(1),
1272            None,
1273            None,
1274            None,
1275            None,
1276            None,
1277        );
1278
1279        exchange
1280            .borrow_mut()
1281            .process_instrument_status(instrument_status);
1282
1283        let market_status = exchange
1284            .borrow()
1285            .get_matching_engine(&crypto_perpetual_ethusdt.id)
1286            .unwrap()
1287            .market_status;
1288        assert_eq!(market_status, MarketStatus::Closed);
1289    }
1290
1291    #[rstest]
1292    fn test_accounting() {
1293        let account_type = AccountType::Margin;
1294        let mut cache = Cache::default();
1295        let handler = get_message_saving_handler::<AccountState>(None);
1296        msgbus::register("Portfolio.update_account".into(), handler.clone());
1297        let margin_account = MarginAccount::new(
1298            AccountState::new(
1299                AccountId::from("SIM-001"),
1300                account_type,
1301                vec![AccountBalance::new(
1302                    Money::from("1000 USD"),
1303                    Money::from("0 USD"),
1304                    Money::from("1000 USD"),
1305                )],
1306                vec![],
1307                false,
1308                UUID4::default(),
1309                UnixNanos::default(),
1310                UnixNanos::default(),
1311                None,
1312            ),
1313            false,
1314        );
1315        let () = cache
1316            .add_account(AccountAny::Margin(margin_account))
1317            .unwrap();
1318        // build indexes
1319        cache.build_index();
1320
1321        let exchange = get_exchange(
1322            Venue::new("SIM"),
1323            account_type,
1324            BookType::L2_MBP,
1325            Some(Rc::new(RefCell::new(cache))),
1326        );
1327        exchange.borrow_mut().initialize_account();
1328
1329        // Test adjust account, increase balance by 500 USD
1330        exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1331
1332        // Check if we received two messages, one for initial account state and one for adjusted account state
1333        let messages = get_saved_messages::<AccountState>(handler);
1334        assert_eq!(messages.len(), 2);
1335        let account_state_first = messages.first().unwrap();
1336        let account_state_second = messages.last().unwrap();
1337
1338        assert_eq!(account_state_first.balances.len(), 1);
1339        let current_balance = account_state_first.balances[0];
1340        assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1341        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1342        assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1343
1344        assert_eq!(account_state_second.balances.len(), 1);
1345        let current_balance = account_state_second.balances[0];
1346        assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1347        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1348        assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1349    }
1350
1351    #[rstest]
1352    fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1353        // Create 3 inflight commands with different timestamps and counters
1354        let inflight1 = InflightCommand::new(
1355            UnixNanos::from(100),
1356            1,
1357            create_submit_order_command(UnixNanos::from(100)),
1358        );
1359        let inflight2 = InflightCommand::new(
1360            UnixNanos::from(200),
1361            2,
1362            create_submit_order_command(UnixNanos::from(200)),
1363        );
1364        let inflight3 = InflightCommand::new(
1365            UnixNanos::from(100),
1366            2,
1367            create_submit_order_command(UnixNanos::from(100)),
1368        );
1369
1370        // Create a binary heap and push the inflight commands
1371        let mut inflight_heap = BinaryHeap::new();
1372        inflight_heap.push(inflight1);
1373        inflight_heap.push(inflight2);
1374        inflight_heap.push(inflight3);
1375
1376        // Pop the inflight commands and check if they are in the correct order
1377        // by our custom ordering with counter and timestamp
1378        let first = inflight_heap.pop().unwrap();
1379        let second = inflight_heap.pop().unwrap();
1380        let third = inflight_heap.pop().unwrap();
1381
1382        assert_eq!(first.timestamp, UnixNanos::from(100));
1383        assert_eq!(first.counter, 1);
1384        assert_eq!(second.timestamp, UnixNanos::from(100));
1385        assert_eq!(second.counter, 2);
1386        assert_eq!(third.timestamp, UnixNanos::from(200));
1387        assert_eq!(third.counter, 2);
1388    }
1389
1390    #[rstest]
1391    fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1392        let exchange = get_exchange(
1393            Venue::new("BINANCE"),
1394            AccountType::Margin,
1395            BookType::L2_MBP,
1396            None,
1397        );
1398
1399        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1400        exchange.borrow_mut().add_instrument(instrument).unwrap();
1401
1402        let command1 = create_submit_order_command(UnixNanos::from(100));
1403        let command2 = create_submit_order_command(UnixNanos::from(200));
1404
1405        exchange.borrow_mut().send(command1);
1406        exchange.borrow_mut().send(command2);
1407
1408        // Verify that message queue has 2 commands and inflight queue is empty
1409        // as we are not using latency model
1410        assert_eq!(exchange.borrow().message_queue.len(), 2);
1411        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1412
1413        // Process command and check that queues is empty
1414        exchange.borrow_mut().process(UnixNanos::from(300));
1415        assert_eq!(exchange.borrow().message_queue.len(), 0);
1416        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1417    }
1418
1419    #[rstest]
1420    fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1421        // StaticLatencyModel adds base_latency to each operation latency
1422        // base=100, insert=200 -> effective insert latency = 300
1423        let latency_model = StaticLatencyModel::new(
1424            UnixNanos::from(100),
1425            UnixNanos::from(200),
1426            UnixNanos::from(300),
1427            UnixNanos::from(100),
1428        );
1429        let exchange = get_exchange(
1430            Venue::new("BINANCE"),
1431            AccountType::Margin,
1432            BookType::L2_MBP,
1433            None,
1434        );
1435        exchange
1436            .borrow_mut()
1437            .set_latency_model(Box::new(latency_model));
1438
1439        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1440        exchange.borrow_mut().add_instrument(instrument).unwrap();
1441
1442        let command1 = create_submit_order_command(UnixNanos::from(100));
1443        let command2 = create_submit_order_command(UnixNanos::from(150));
1444        exchange.borrow_mut().send(command1);
1445        exchange.borrow_mut().send(command2);
1446
1447        // Verify that inflight queue has 2 commands and message queue is empty
1448        assert_eq!(exchange.borrow().message_queue.len(), 0);
1449        assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1450        // First inflight command: ts_init=100 + effective_insert_latency=300 = 400
1451        assert_eq!(
1452            exchange
1453                .borrow()
1454                .inflight_queue
1455                .iter()
1456                .next()
1457                .unwrap()
1458                .timestamp,
1459            UnixNanos::from(400)
1460        );
1461        // Second inflight command: ts_init=150 + effective_insert_latency=300 = 450
1462        assert_eq!(
1463            exchange
1464                .borrow()
1465                .inflight_queue
1466                .iter()
1467                .nth(1)
1468                .unwrap()
1469                .timestamp,
1470            UnixNanos::from(450)
1471        );
1472
1473        // Process at timestamp 420, and test that only first command is processed
1474        exchange.borrow_mut().process(UnixNanos::from(420));
1475        assert_eq!(exchange.borrow().message_queue.len(), 0);
1476        assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1477        assert_eq!(
1478            exchange
1479                .borrow()
1480                .inflight_queue
1481                .iter()
1482                .next()
1483                .unwrap()
1484                .timestamp,
1485            UnixNanos::from(450)
1486        );
1487    }
1488}