nautilus_backtest/
exchange.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `SimulatedExchange` venue for backtesting on historical data.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23    cell::RefCell,
24    collections::{BinaryHeap, HashMap, VecDeque},
25    fmt::Debug,
26    rc::Rc,
27};
28
29use nautilus_common::{cache::Cache, clock::Clock, messages::execution::TradingCommand};
30use nautilus_core::{
31    UnixNanos,
32    correctness::{FAILED, check_equal},
33};
34use nautilus_execution::{
35    client::ExecutionClient,
36    matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
37    models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
38};
39use nautilus_model::{
40    accounts::AccountAny,
41    data::{
42        Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
43        QuoteTick, TradeTick,
44    },
45    enums::{AccountType, BookType, OmsType},
46    identifiers::{InstrumentId, Venue},
47    instruments::{Instrument, InstrumentAny},
48    orderbook::OrderBook,
49    orders::PassiveOrderAny,
50    types::{AccountBalance, Currency, Money, Price},
51};
52use rust_decimal::Decimal;
53
54use crate::modules::SimulationModule;
55
56/// Represents commands with simulated network latency in a min-heap priority queue.
57/// The commands are ordered by timestamp for FIFO processing, with the
58/// earliest timestamp having the highest priority in the queue.
59#[derive(Debug, Eq, PartialEq)]
60struct InflightCommand {
61    ts: UnixNanos,
62    counter: u32,
63    command: TradingCommand,
64}
65
66impl InflightCommand {
67    const fn new(ts: UnixNanos, counter: u32, command: TradingCommand) -> Self {
68        Self {
69            ts,
70            counter,
71            command,
72        }
73    }
74}
75
76impl Ord for InflightCommand {
77    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78        // Reverse ordering for min-heap (earliest timestamp first then lowest counter)
79        other
80            .ts
81            .cmp(&self.ts)
82            .then_with(|| other.counter.cmp(&self.counter))
83    }
84}
85
86impl PartialOrd for InflightCommand {
87    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92/// Simulated exchange venue for realistic trading execution during backtesting.
93///
94/// The `SimulatedExchange` provides a comprehensive simulation of a trading venue,
95/// including order matching engines, account management, and realistic execution
96/// models. It maintains order books, processes market data, and executes trades
97/// with configurable latency and fill models to accurately simulate real market
98/// conditions during backtesting.
99///
100/// Key features:
101/// - Multi-instrument order matching with realistic execution
102/// - Configurable fee, fill, and latency models
103/// - Support for various order types and execution options
104/// - Account balance and position management
105/// - Market data processing and order book maintenance
106/// - Simulation modules for custom venue behaviors
107pub struct SimulatedExchange {
108    pub id: Venue,
109    pub oms_type: OmsType,
110    pub account_type: AccountType,
111    starting_balances: Vec<Money>,
112    book_type: BookType,
113    default_leverage: Decimal,
114    exec_client: Option<Rc<dyn ExecutionClient>>,
115    pub base_currency: Option<Currency>,
116    fee_model: FeeModelAny,
117    fill_model: FillModel,
118    latency_model: Option<LatencyModel>,
119    instruments: HashMap<InstrumentId, InstrumentAny>,
120    matching_engines: HashMap<InstrumentId, OrderMatchingEngine>,
121    leverages: HashMap<InstrumentId, Decimal>,
122    modules: Vec<Box<dyn SimulationModule>>,
123    clock: Rc<RefCell<dyn Clock>>,
124    cache: Rc<RefCell<Cache>>,
125    message_queue: VecDeque<TradingCommand>,
126    inflight_queue: BinaryHeap<InflightCommand>,
127    inflight_counter: HashMap<UnixNanos, u32>,
128    bar_execution: bool,
129    reject_stop_orders: bool,
130    support_gtd_orders: bool,
131    support_contingent_orders: bool,
132    use_position_ids: bool,
133    use_random_ids: bool,
134    use_reduce_only: bool,
135    use_message_queue: bool,
136    allow_cash_borrowing: bool,
137    frozen_account: bool,
138    price_protection_points: u32,
139}
140
141impl Debug for SimulatedExchange {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct(stringify!(SimulatedExchange))
144            .field("id", &self.id)
145            .field("account_type", &self.account_type)
146            .finish()
147    }
148}
149
150impl SimulatedExchange {
151    /// Creates a new [`SimulatedExchange`] instance.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if:
156    /// - `starting_balances` is empty.
157    /// - `base_currency` is `Some` but `starting_balances` contains multiple currencies.
158    #[allow(clippy::too_many_arguments)]
159    pub fn new(
160        venue: Venue,
161        oms_type: OmsType,
162        account_type: AccountType,
163        starting_balances: Vec<Money>,
164        base_currency: Option<Currency>,
165        default_leverage: Decimal,
166        leverages: HashMap<InstrumentId, Decimal>,
167        modules: Vec<Box<dyn SimulationModule>>,
168        cache: Rc<RefCell<Cache>>,
169        clock: Rc<RefCell<dyn Clock>>,
170        fill_model: FillModel,
171        fee_model: FeeModelAny,
172        book_type: BookType,
173        latency_model: Option<LatencyModel>,
174        bar_execution: Option<bool>,
175        reject_stop_orders: Option<bool>,
176        support_gtd_orders: Option<bool>,
177        support_contingent_orders: Option<bool>,
178        use_position_ids: Option<bool>,
179        use_random_ids: Option<bool>,
180        use_reduce_only: Option<bool>,
181        use_message_queue: Option<bool>,
182        allow_cash_borrowing: Option<bool>,
183        frozen_account: Option<bool>,
184        price_protection_points: Option<u32>,
185    ) -> anyhow::Result<Self> {
186        if starting_balances.is_empty() {
187            anyhow::bail!("Starting balances must be provided")
188        }
189        if base_currency.is_some() && starting_balances.len() > 1 {
190            anyhow::bail!("single-currency account has multiple starting currencies")
191        }
192        // TODO register and load modules
193        Ok(Self {
194            id: venue,
195            oms_type,
196            account_type,
197            starting_balances,
198            book_type,
199            default_leverage,
200            exec_client: None,
201            base_currency,
202            fee_model,
203            fill_model,
204            latency_model,
205            instruments: HashMap::new(),
206            matching_engines: HashMap::new(),
207            leverages,
208            modules,
209            clock,
210            cache,
211            message_queue: VecDeque::new(),
212            inflight_queue: BinaryHeap::new(),
213            inflight_counter: HashMap::new(),
214            bar_execution: bar_execution.unwrap_or(true),
215            reject_stop_orders: reject_stop_orders.unwrap_or(true),
216            support_gtd_orders: support_gtd_orders.unwrap_or(true),
217            support_contingent_orders: support_contingent_orders.unwrap_or(true),
218            use_position_ids: use_position_ids.unwrap_or(true),
219            use_random_ids: use_random_ids.unwrap_or(false),
220            use_reduce_only: use_reduce_only.unwrap_or(true),
221            use_message_queue: use_message_queue.unwrap_or(true),
222            allow_cash_borrowing: allow_cash_borrowing.unwrap_or(false),
223            frozen_account: frozen_account.unwrap_or(false),
224            price_protection_points: price_protection_points.unwrap_or(0),
225        })
226    }
227
228    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
229        self.exec_client = Some(client);
230    }
231
232    pub fn set_fill_model(&mut self, fill_model: FillModel) {
233        for matching_engine in self.matching_engines.values_mut() {
234            matching_engine.set_fill_model(fill_model.clone());
235            log::info!(
236                "Setting fill model for {} to {}",
237                matching_engine.venue,
238                self.fill_model
239            );
240        }
241        self.fill_model = fill_model;
242    }
243
244    pub const fn set_latency_model(&mut self, latency_model: LatencyModel) {
245        self.latency_model = Some(latency_model);
246    }
247
248    pub fn initialize_account(&mut self) {
249        self.generate_fresh_account_state();
250    }
251
252    /// Adds an instrument to the simulated exchange and initializes its matching engine.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the exchange account type is `Cash` and the instrument is a `CryptoPerpetual` or `CryptoFuture`.
257    ///
258    /// # Panics
259    ///
260    /// Panics if the instrument cannot be added to the exchange.
261    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
262        check_equal(
263            &instrument.id().venue,
264            &self.id,
265            "Venue of instrument id",
266            "Venue of simulated exchange",
267        )
268        .expect(FAILED);
269
270        if self.account_type == AccountType::Cash
271            && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
272                || matches!(instrument, InstrumentAny::CryptoFuture(_)))
273        {
274            anyhow::bail!("Cash account cannot trade futures or perpetuals")
275        }
276
277        self.instruments.insert(instrument.id(), instrument.clone());
278
279        let price_protection = if self.price_protection_points == 0 {
280            None
281        } else {
282            Some(self.price_protection_points)
283        };
284
285        let matching_engine_config = OrderMatchingEngineConfig::new(
286            self.bar_execution,
287            self.reject_stop_orders,
288            self.support_gtd_orders,
289            self.support_contingent_orders,
290            self.use_position_ids,
291            self.use_random_ids,
292            self.use_reduce_only,
293        )
294        .with_price_protection_points(price_protection);
295        let instrument_id = instrument.id();
296        let matching_engine = OrderMatchingEngine::new(
297            instrument,
298            self.instruments.len() as u32,
299            self.fill_model.clone(),
300            self.fee_model.clone(),
301            self.book_type,
302            self.oms_type,
303            self.account_type,
304            self.clock.clone(),
305            Rc::clone(&self.cache),
306            matching_engine_config,
307        );
308        self.matching_engines.insert(instrument_id, matching_engine);
309
310        log::info!("Added instrument {instrument_id} and created matching engine");
311        Ok(())
312    }
313
314    #[must_use]
315    pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
316        self.matching_engines
317            .get(&instrument_id)
318            .and_then(OrderMatchingEngine::best_bid_price)
319    }
320
321    #[must_use]
322    pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
323        self.matching_engines
324            .get(&instrument_id)
325            .and_then(OrderMatchingEngine::best_ask_price)
326    }
327
328    pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
329        self.matching_engines
330            .get(&instrument_id)
331            .map(OrderMatchingEngine::get_book)
332    }
333
334    #[must_use]
335    pub fn get_matching_engine(
336        &self,
337        instrument_id: &InstrumentId,
338    ) -> Option<&OrderMatchingEngine> {
339        self.matching_engines.get(instrument_id)
340    }
341
342    #[must_use]
343    pub const fn get_matching_engines(&self) -> &HashMap<InstrumentId, OrderMatchingEngine> {
344        &self.matching_engines
345    }
346
347    #[must_use]
348    pub fn get_books(&self) -> HashMap<InstrumentId, OrderBook> {
349        let mut books = HashMap::new();
350        for (instrument_id, matching_engine) in &self.matching_engines {
351            books.insert(*instrument_id, matching_engine.get_book().clone());
352        }
353        books
354    }
355
356    #[must_use]
357    pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
358        instrument_id
359            .and_then(|id| {
360                self.matching_engines
361                    .get(&id)
362                    .map(OrderMatchingEngine::get_open_orders)
363            })
364            .unwrap_or_else(|| {
365                self.matching_engines
366                    .values()
367                    .flat_map(OrderMatchingEngine::get_open_orders)
368                    .collect()
369            })
370    }
371
372    #[must_use]
373    pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
374        instrument_id
375            .and_then(|id| {
376                self.matching_engines
377                    .get(&id)
378                    .map(|engine| engine.get_open_bid_orders().to_vec())
379            })
380            .unwrap_or_else(|| {
381                self.matching_engines
382                    .values()
383                    .flat_map(|engine| engine.get_open_bid_orders().to_vec())
384                    .collect()
385            })
386    }
387
388    #[must_use]
389    pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
390        instrument_id
391            .and_then(|id| {
392                self.matching_engines
393                    .get(&id)
394                    .map(|engine| engine.get_open_ask_orders().to_vec())
395            })
396            .unwrap_or_else(|| {
397                self.matching_engines
398                    .values()
399                    .flat_map(|engine| engine.get_open_ask_orders().to_vec())
400                    .collect()
401            })
402    }
403
404    /// # Panics
405    ///
406    /// Panics if retrieving the account from the execution client fails.
407    #[must_use]
408    pub fn get_account(&self) -> Option<AccountAny> {
409        self.exec_client
410            .as_ref()
411            .map(|client| client.get_account().unwrap())
412    }
413
414    /// # Panics
415    ///
416    /// Panics if generating account state fails during adjustment.
417    pub fn adjust_account(&mut self, adjustment: Money) {
418        if self.frozen_account {
419            // Nothing to adjust
420            return;
421        }
422
423        if let Some(exec_client) = &self.exec_client {
424            let venue = exec_client.venue();
425            println!("Adjusting account for venue {venue}");
426            if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
427                match account.balance(Some(adjustment.currency)) {
428                    Some(balance) => {
429                        let mut current_balance = *balance;
430                        current_balance.total += adjustment;
431                        current_balance.free += adjustment;
432
433                        let margins = match account {
434                            AccountAny::Margin(margin_account) => margin_account.margins.clone(),
435                            _ => HashMap::new(),
436                        };
437
438                        if let Some(exec_client) = &self.exec_client {
439                            exec_client
440                                .generate_account_state(
441                                    vec![current_balance],
442                                    margins.values().copied().collect(),
443                                    true,
444                                    self.clock.borrow().timestamp_ns(),
445                                )
446                                .unwrap();
447                        }
448                    }
449                    None => {
450                        log::error!(
451                            "Cannot adjust account: no balance for currency {}",
452                            adjustment.currency
453                        );
454                    }
455                }
456            } else {
457                log::error!("Cannot adjust account: no account for venue {venue}");
458            }
459        }
460    }
461
462    pub fn send(&mut self, command: TradingCommand) {
463        if !self.use_message_queue {
464            self.process_trading_command(command);
465        } else if self.latency_model.is_none() {
466            self.message_queue.push_back(command);
467        } else {
468            let (ts, counter) = self.generate_inflight_command(&command);
469            self.inflight_queue
470                .push(InflightCommand::new(ts, counter, command));
471        }
472    }
473
474    /// # Panics
475    ///
476    /// Panics if the command is invalid when generating inflight command.
477    pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
478        if let Some(latency_model) = &self.latency_model {
479            let ts = match command {
480                TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
481                    command.ts_init() + latency_model.insert_latency_nanos
482                }
483                TradingCommand::ModifyOrder(_) => {
484                    command.ts_init() + latency_model.update_latency_nanos
485                }
486                TradingCommand::CancelOrder(_)
487                | TradingCommand::CancelAllOrders(_)
488                | TradingCommand::BatchCancelOrders(_) => {
489                    command.ts_init() + latency_model.delete_latency_nanos
490                }
491                _ => panic!("Cannot handle command: {command:?}"),
492            };
493
494            let counter = self
495                .inflight_counter
496                .entry(ts)
497                .and_modify(|e| *e += 1)
498                .or_insert(1);
499
500            (ts, *counter)
501        } else {
502            panic!("Latency model should be initialized");
503        }
504    }
505
506    /// # Panics
507    ///
508    /// Panics if adding a missing instrument during delta processing fails.
509    pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
510        for module in &self.modules {
511            module.pre_process(Data::Delta(delta));
512        }
513
514        if !self.matching_engines.contains_key(&delta.instrument_id) {
515            let instrument = {
516                let cache = self.cache.as_ref().borrow();
517                cache.instrument(&delta.instrument_id).cloned()
518            };
519
520            if let Some(instrument) = instrument {
521                self.add_instrument(instrument).unwrap();
522            } else {
523                panic!(
524                    "No matching engine found for instrument {}",
525                    delta.instrument_id
526                );
527            }
528        }
529
530        if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
531            matching_engine.process_order_book_delta(&delta).unwrap();
532        } else {
533            panic!("Matching engine should be initialized");
534        }
535    }
536
537    /// # Panics
538    ///
539    /// Panics if adding a missing instrument during deltas processing fails.
540    pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
541        for module in &self.modules {
542            module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
543        }
544
545        if !self.matching_engines.contains_key(&deltas.instrument_id) {
546            let instrument = {
547                let cache = self.cache.as_ref().borrow();
548                cache.instrument(&deltas.instrument_id).cloned()
549            };
550
551            if let Some(instrument) = instrument {
552                self.add_instrument(instrument).unwrap();
553            } else {
554                panic!(
555                    "No matching engine found for instrument {}",
556                    deltas.instrument_id
557                );
558            }
559        }
560
561        if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
562            matching_engine.process_order_book_deltas(&deltas).unwrap();
563        } else {
564            panic!("Matching engine should be initialized");
565        }
566    }
567
568    /// # Panics
569    ///
570    /// Panics if adding a missing instrument during quote tick processing fails.
571    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
572        for module in &self.modules {
573            module.pre_process(Data::Quote(quote.to_owned()));
574        }
575
576        if !self.matching_engines.contains_key(&quote.instrument_id) {
577            let instrument = {
578                let cache = self.cache.as_ref().borrow();
579                cache.instrument(&quote.instrument_id).cloned()
580            };
581
582            if let Some(instrument) = instrument {
583                self.add_instrument(instrument).unwrap();
584            } else {
585                panic!(
586                    "No matching engine found for instrument {}",
587                    quote.instrument_id
588                );
589            }
590        }
591
592        if let Some(matching_engine) = self.matching_engines.get_mut(&quote.instrument_id) {
593            matching_engine.process_quote_tick(quote);
594        } else {
595            panic!("Matching engine should be initialized");
596        }
597    }
598
599    /// # Panics
600    ///
601    /// Panics if adding a missing instrument during trade tick processing fails.
602    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
603        for module in &self.modules {
604            module.pre_process(Data::Trade(trade.to_owned()));
605        }
606
607        if !self.matching_engines.contains_key(&trade.instrument_id) {
608            let instrument = {
609                let cache = self.cache.as_ref().borrow();
610                cache.instrument(&trade.instrument_id).cloned()
611            };
612
613            if let Some(instrument) = instrument {
614                self.add_instrument(instrument).unwrap();
615            } else {
616                panic!(
617                    "No matching engine found for instrument {}",
618                    trade.instrument_id
619                );
620            }
621        }
622
623        if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
624            matching_engine.process_trade_tick(trade);
625        } else {
626            panic!("Matching engine should be initialized");
627        }
628    }
629
630    /// # Panics
631    ///
632    /// Panics if adding a missing instrument during bar processing fails.
633    pub fn process_bar(&mut self, bar: Bar) {
634        for module in &self.modules {
635            module.pre_process(Data::Bar(bar));
636        }
637
638        if !self.matching_engines.contains_key(&bar.instrument_id()) {
639            let instrument = {
640                let cache = self.cache.as_ref().borrow();
641                cache.instrument(&bar.instrument_id()).cloned()
642            };
643
644            if let Some(instrument) = instrument {
645                self.add_instrument(instrument).unwrap();
646            } else {
647                panic!(
648                    "No matching engine found for instrument {}",
649                    bar.instrument_id()
650                );
651            }
652        }
653
654        if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
655            matching_engine.process_bar(&bar);
656        } else {
657            panic!("Matching engine should be initialized");
658        }
659    }
660
661    /// # Panics
662    ///
663    /// Panics if adding a missing instrument during instrument status processing fails.
664    pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
665        // TODO add module preprocessing
666
667        if !self.matching_engines.contains_key(&status.instrument_id) {
668            let instrument = {
669                let cache = self.cache.as_ref().borrow();
670                cache.instrument(&status.instrument_id).cloned()
671            };
672
673            if let Some(instrument) = instrument {
674                self.add_instrument(instrument).unwrap();
675            } else {
676                panic!(
677                    "No matching engine found for instrument {}",
678                    status.instrument_id
679                );
680            }
681        }
682
683        if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
684            matching_engine.process_status(status.action);
685        } else {
686            panic!("Matching engine should be initialized");
687        }
688    }
689
690    /// # Panics
691    ///
692    /// Panics if popping an inflight command fails during processing.
693    pub fn process(&mut self, ts_now: UnixNanos) {
694        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
695
696        // Process inflight commands
697        while let Some(inflight) = self.inflight_queue.peek() {
698            if inflight.ts > ts_now {
699                // Future commands remain in the queue
700                break;
701            }
702            // We get the inflight command, remove it from the queue and process it
703            let inflight = self.inflight_queue.pop().unwrap();
704            self.process_trading_command(inflight.command);
705        }
706
707        // Process regular message queue
708        while let Some(command) = self.message_queue.pop_front() {
709            self.process_trading_command(command);
710        }
711    }
712
713    pub fn reset(&mut self) {
714        for module in &self.modules {
715            module.reset();
716        }
717
718        self.generate_fresh_account_state();
719
720        for matching_engine in self.matching_engines.values_mut() {
721            matching_engine.reset();
722        }
723
724        // TODO Clear the inflight and message queues
725        log::info!("Resetting exchange state");
726    }
727
728    /// # Panics
729    ///
730    /// Panics if execution client is uninitialized when processing trading command.
731    pub fn process_trading_command(&mut self, command: TradingCommand) {
732        if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
733            let account_id = if let Some(exec_client) = &self.exec_client {
734                exec_client.account_id()
735            } else {
736                panic!("Execution client should be initialized");
737            };
738            match command {
739                TradingCommand::SubmitOrder(mut command) => {
740                    matching_engine.process_order(&mut command.order, account_id);
741                }
742                TradingCommand::ModifyOrder(ref command) => {
743                    matching_engine.process_modify(command, account_id);
744                }
745                TradingCommand::CancelOrder(ref command) => {
746                    matching_engine.process_cancel(command, account_id);
747                }
748                TradingCommand::CancelAllOrders(ref command) => {
749                    matching_engine.process_cancel_all(command, account_id);
750                }
751                TradingCommand::BatchCancelOrders(ref command) => {
752                    matching_engine.process_batch_cancel(command, account_id);
753                }
754                TradingCommand::SubmitOrderList(mut command) => {
755                    for order in &mut command.order_list.orders {
756                        matching_engine.process_order(order, account_id);
757                    }
758                }
759                _ => {}
760            }
761        } else {
762            panic!(
763                "Matching engine not found for instrument {}",
764                command.instrument_id()
765            );
766        }
767    }
768
769    /// # Panics
770    ///
771    /// Panics if generating fresh account state fails.
772    pub fn generate_fresh_account_state(&self) {
773        let balances: Vec<AccountBalance> = self
774            .starting_balances
775            .iter()
776            .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
777            .collect();
778
779        if let Some(exec_client) = &self.exec_client {
780            exec_client
781                .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
782                .unwrap();
783        }
784
785        // Set leverages
786        if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
787            margin_account.set_default_leverage(self.default_leverage);
788
789            // Set instrument specific leverages
790            for (instrument_id, leverage) in &self.leverages {
791                margin_account.set_leverage(*instrument_id, *leverage);
792            }
793        }
794    }
795}
796
797////////////////////////////////////////////////////////////////////////////////
798// Tests
799////////////////////////////////////////////////////////////////////////////////
800
801#[cfg(test)]
802mod tests {
803    use std::{
804        cell::RefCell,
805        collections::{BinaryHeap, HashMap},
806        rc::Rc,
807        sync::LazyLock,
808    };
809
810    use nautilus_common::{
811        cache::Cache,
812        clock::TestClock,
813        messages::execution::{SubmitOrder, TradingCommand},
814        msgbus::{
815            self,
816            stubs::{get_message_saving_handler, get_saved_messages},
817        },
818    };
819    use nautilus_core::{AtomicTime, UUID4, UnixNanos};
820    use nautilus_execution::models::{
821        fee::{FeeModelAny, MakerTakerFeeModel},
822        fill::FillModel,
823        latency::LatencyModel,
824    };
825    use nautilus_model::{
826        accounts::{AccountAny, MarginAccount},
827        data::{
828            Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
829            TradeTick,
830        },
831        enums::{
832            AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
833            OmsType, OrderSide, OrderType,
834        },
835        events::AccountState,
836        identifiers::{
837            AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
838            VenueOrderId,
839        },
840        instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
841        orders::OrderTestBuilder,
842        types::{AccountBalance, Currency, Money, Price, Quantity},
843    };
844    use rstest::rstest;
845
846    use crate::{
847        exchange::{InflightCommand, SimulatedExchange},
848        execution_client::BacktestExecutionClient,
849    };
850
851    static ATOMIC_TIME: LazyLock<AtomicTime> =
852        LazyLock::new(|| AtomicTime::new(true, UnixNanos::default()));
853
854    fn get_exchange(
855        venue: Venue,
856        account_type: AccountType,
857        book_type: BookType,
858        cache: Option<Rc<RefCell<Cache>>>,
859    ) -> Rc<RefCell<SimulatedExchange>> {
860        let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
861        let clock = Rc::new(RefCell::new(TestClock::new()));
862        let exchange = Rc::new(RefCell::new(
863            SimulatedExchange::new(
864                venue,
865                OmsType::Netting,
866                account_type,
867                vec![Money::new(1000.0, Currency::USD())],
868                None,
869                1.into(),
870                HashMap::new(),
871                vec![],
872                cache.clone(),
873                clock,
874                FillModel::default(),
875                FeeModelAny::MakerTaker(MakerTakerFeeModel),
876                book_type,
877                None,
878                None,
879                None,
880                None,
881                None,
882                None,
883                None,
884                None,
885                None,
886                None,
887                None,
888                None,
889            )
890            .unwrap(),
891        ));
892
893        let clock = TestClock::new();
894        let execution_client = BacktestExecutionClient::new(
895            TraderId::default(),
896            AccountId::default(),
897            exchange.clone(),
898            cache,
899            Rc::new(RefCell::new(clock)),
900            None,
901            None,
902        );
903        exchange
904            .borrow_mut()
905            .register_client(Rc::new(execution_client));
906
907        exchange
908    }
909
910    fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
911        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
912        let order = OrderTestBuilder::new(OrderType::Market)
913            .instrument_id(instrument_id)
914            .quantity(Quantity::from(1))
915            .build();
916        TradingCommand::SubmitOrder(
917            SubmitOrder::new(
918                TraderId::default(),
919                ClientId::default(),
920                StrategyId::default(),
921                instrument_id,
922                ClientOrderId::default(),
923                VenueOrderId::default(),
924                order,
925                None,
926                None,
927                None, // params
928                UUID4::default(),
929                ts_init,
930            )
931            .unwrap(),
932        )
933    }
934
935    #[rstest]
936    #[should_panic(
937        expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
938    )]
939    fn test_venue_mismatch_between_exchange_and_instrument(
940        crypto_perpetual_ethusdt: CryptoPerpetual,
941    ) {
942        let exchange = get_exchange(
943            Venue::new("SIM"),
944            AccountType::Margin,
945            BookType::L1_MBP,
946            None,
947        );
948        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
949        exchange.borrow_mut().add_instrument(instrument).unwrap();
950    }
951
952    #[rstest]
953    #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
954    fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
955        let exchange = get_exchange(
956            Venue::new("BINANCE"),
957            AccountType::Cash,
958            BookType::L1_MBP,
959            None,
960        );
961        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
962        exchange.borrow_mut().add_instrument(instrument).unwrap();
963    }
964
965    #[rstest]
966    fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
967        let exchange = get_exchange(
968            Venue::new("BINANCE"),
969            AccountType::Margin,
970            BookType::L1_MBP,
971            None,
972        );
973        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
974
975        // register instrument
976        exchange.borrow_mut().add_instrument(instrument).unwrap();
977
978        // process tick
979        let quote_tick = QuoteTick::new(
980            crypto_perpetual_ethusdt.id,
981            Price::from("1000"),
982            Price::from("1001"),
983            Quantity::from(1),
984            Quantity::from(1),
985            UnixNanos::default(),
986            UnixNanos::default(),
987        );
988        exchange.borrow_mut().process_quote_tick(&quote_tick);
989
990        let best_bid_price = exchange
991            .borrow()
992            .best_bid_price(crypto_perpetual_ethusdt.id);
993        assert_eq!(best_bid_price, Some(Price::from("1000")));
994        let best_ask_price = exchange
995            .borrow()
996            .best_ask_price(crypto_perpetual_ethusdt.id);
997        assert_eq!(best_ask_price, Some(Price::from("1001")));
998    }
999
1000    #[rstest]
1001    fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
1002        let exchange = get_exchange(
1003            Venue::new("BINANCE"),
1004            AccountType::Margin,
1005            BookType::L1_MBP,
1006            None,
1007        );
1008        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1009
1010        // register instrument
1011        exchange.borrow_mut().add_instrument(instrument).unwrap();
1012
1013        // process tick
1014        let trade_tick = TradeTick::new(
1015            crypto_perpetual_ethusdt.id,
1016            Price::from("1000"),
1017            Quantity::from(1),
1018            AggressorSide::Buyer,
1019            TradeId::from("1"),
1020            UnixNanos::default(),
1021            UnixNanos::default(),
1022        );
1023        exchange.borrow_mut().process_trade_tick(&trade_tick);
1024
1025        let best_bid_price = exchange
1026            .borrow()
1027            .best_bid_price(crypto_perpetual_ethusdt.id);
1028        assert_eq!(best_bid_price, Some(Price::from("1000")));
1029        let best_ask = exchange
1030            .borrow()
1031            .best_ask_price(crypto_perpetual_ethusdt.id);
1032        assert_eq!(best_ask, Some(Price::from("1000")));
1033    }
1034
1035    #[rstest]
1036    fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1037        let exchange = get_exchange(
1038            Venue::new("BINANCE"),
1039            AccountType::Margin,
1040            BookType::L1_MBP,
1041            None,
1042        );
1043        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1044
1045        // register instrument
1046        exchange.borrow_mut().add_instrument(instrument).unwrap();
1047
1048        // process bar
1049        let bar = Bar::new(
1050            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1051            Price::from("1500.00"),
1052            Price::from("1505.00"),
1053            Price::from("1490.00"),
1054            Price::from("1502.00"),
1055            Quantity::from(100),
1056            UnixNanos::default(),
1057            UnixNanos::default(),
1058        );
1059        exchange.borrow_mut().process_bar(bar);
1060
1061        // this will be processed as ticks so both bid and ask will be the same as close of the bar
1062        let best_bid_price = exchange
1063            .borrow()
1064            .best_bid_price(crypto_perpetual_ethusdt.id);
1065        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1066        let best_ask_price = exchange
1067            .borrow()
1068            .best_ask_price(crypto_perpetual_ethusdt.id);
1069        assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1070    }
1071
1072    #[rstest]
1073    fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1074        let exchange = get_exchange(
1075            Venue::new("BINANCE"),
1076            AccountType::Margin,
1077            BookType::L1_MBP,
1078            None,
1079        );
1080        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1081
1082        // register instrument
1083        exchange.borrow_mut().add_instrument(instrument).unwrap();
1084
1085        // create both bid and ask based bars
1086        // add +1 on ask to make sure it is different from bid
1087        let bar_bid = Bar::new(
1088            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1089            Price::from("1500.00"),
1090            Price::from("1505.00"),
1091            Price::from("1490.00"),
1092            Price::from("1502.00"),
1093            Quantity::from(100),
1094            UnixNanos::from(1),
1095            UnixNanos::from(1),
1096        );
1097        let bar_ask = Bar::new(
1098            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1099            Price::from("1501.00"),
1100            Price::from("1506.00"),
1101            Price::from("1491.00"),
1102            Price::from("1503.00"),
1103            Quantity::from(100),
1104            UnixNanos::from(1),
1105            UnixNanos::from(1),
1106        );
1107
1108        // process them
1109        exchange.borrow_mut().process_bar(bar_bid);
1110        exchange.borrow_mut().process_bar(bar_ask);
1111
1112        // current bid and ask prices will be the corresponding close of the ask and bid bar
1113        let best_bid_price = exchange
1114            .borrow()
1115            .best_bid_price(crypto_perpetual_ethusdt.id);
1116        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1117        let best_ask_price = exchange
1118            .borrow()
1119            .best_ask_price(crypto_perpetual_ethusdt.id);
1120        assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1121    }
1122
1123    #[rstest]
1124    fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1125        let exchange = get_exchange(
1126            Venue::new("BINANCE"),
1127            AccountType::Margin,
1128            BookType::L2_MBP,
1129            None,
1130        );
1131        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1132
1133        // register instrument
1134        exchange.borrow_mut().add_instrument(instrument).unwrap();
1135
1136        // create order book delta at both bid and ask with incremented ts init and sequence
1137        let delta_buy = OrderBookDelta::new(
1138            crypto_perpetual_ethusdt.id,
1139            BookAction::Add,
1140            BookOrder::new(OrderSide::Buy, Price::from("1000.00"), Quantity::from(1), 1),
1141            0,
1142            0,
1143            UnixNanos::from(1),
1144            UnixNanos::from(1),
1145        );
1146        let delta_sell = OrderBookDelta::new(
1147            crypto_perpetual_ethusdt.id,
1148            BookAction::Add,
1149            BookOrder::new(
1150                OrderSide::Sell,
1151                Price::from("1001.00"),
1152                Quantity::from(1),
1153                1,
1154            ),
1155            0,
1156            1,
1157            UnixNanos::from(2),
1158            UnixNanos::from(2),
1159        );
1160
1161        // process both deltas
1162        exchange.borrow_mut().process_order_book_delta(delta_buy);
1163        exchange.borrow_mut().process_order_book_delta(delta_sell);
1164
1165        let book = exchange
1166            .borrow()
1167            .get_book(crypto_perpetual_ethusdt.id)
1168            .unwrap()
1169            .clone();
1170        assert_eq!(book.update_count, 2);
1171        assert_eq!(book.sequence, 1);
1172        assert_eq!(book.ts_last, UnixNanos::from(2));
1173        let best_bid_price = exchange
1174            .borrow()
1175            .best_bid_price(crypto_perpetual_ethusdt.id);
1176        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1177        let best_ask_price = exchange
1178            .borrow()
1179            .best_ask_price(crypto_perpetual_ethusdt.id);
1180        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1181    }
1182
1183    #[rstest]
1184    fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1185        let exchange = get_exchange(
1186            Venue::new("BINANCE"),
1187            AccountType::Margin,
1188            BookType::L2_MBP,
1189            None,
1190        );
1191        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1192
1193        // register instrument
1194        exchange.borrow_mut().add_instrument(instrument).unwrap();
1195
1196        // create two sell order book deltas with same timestamps and higher sequence
1197        let delta_sell_1 = OrderBookDelta::new(
1198            crypto_perpetual_ethusdt.id,
1199            BookAction::Add,
1200            BookOrder::new(
1201                OrderSide::Sell,
1202                Price::from("1000.00"),
1203                Quantity::from(3),
1204                1,
1205            ),
1206            0,
1207            0,
1208            UnixNanos::from(1),
1209            UnixNanos::from(1),
1210        );
1211        let delta_sell_2 = OrderBookDelta::new(
1212            crypto_perpetual_ethusdt.id,
1213            BookAction::Add,
1214            BookOrder::new(
1215                OrderSide::Sell,
1216                Price::from("1001.00"),
1217                Quantity::from(1),
1218                1,
1219            ),
1220            0,
1221            1,
1222            UnixNanos::from(1),
1223            UnixNanos::from(1),
1224        );
1225        let orderbook_deltas = OrderBookDeltas::new(
1226            crypto_perpetual_ethusdt.id,
1227            vec![delta_sell_1, delta_sell_2],
1228        );
1229
1230        // process both deltas
1231        exchange
1232            .borrow_mut()
1233            .process_order_book_deltas(orderbook_deltas);
1234
1235        let book = exchange
1236            .borrow()
1237            .get_book(crypto_perpetual_ethusdt.id)
1238            .unwrap()
1239            .clone();
1240        assert_eq!(book.update_count, 2);
1241        assert_eq!(book.sequence, 1);
1242        assert_eq!(book.ts_last, UnixNanos::from(1));
1243        let best_bid_price = exchange
1244            .borrow()
1245            .best_bid_price(crypto_perpetual_ethusdt.id);
1246        // no bid orders in orderbook deltas
1247        assert_eq!(best_bid_price, None);
1248        let best_ask_price = exchange
1249            .borrow()
1250            .best_ask_price(crypto_perpetual_ethusdt.id);
1251        // best ask price is the first order in orderbook deltas
1252        assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1253    }
1254
1255    #[rstest]
1256    fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1257        let exchange = get_exchange(
1258            Venue::new("BINANCE"),
1259            AccountType::Margin,
1260            BookType::L2_MBP,
1261            None,
1262        );
1263        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1264
1265        // register instrument
1266        exchange.borrow_mut().add_instrument(instrument).unwrap();
1267
1268        let instrument_status = InstrumentStatus::new(
1269            crypto_perpetual_ethusdt.id,
1270            MarketStatusAction::Close, // close the market
1271            UnixNanos::from(1),
1272            UnixNanos::from(1),
1273            None,
1274            None,
1275            None,
1276            None,
1277            None,
1278        );
1279
1280        exchange
1281            .borrow_mut()
1282            .process_instrument_status(instrument_status);
1283
1284        let market_status = exchange
1285            .borrow()
1286            .get_matching_engine(&crypto_perpetual_ethusdt.id)
1287            .unwrap()
1288            .market_status;
1289        assert_eq!(market_status, MarketStatus::Closed);
1290    }
1291
1292    #[rstest]
1293    fn test_accounting() {
1294        let account_type = AccountType::Margin;
1295        let mut cache = Cache::default();
1296        let handler = get_message_saving_handler::<AccountState>(None);
1297        msgbus::register("Portfolio.update_account".into(), handler.clone());
1298        let margin_account = MarginAccount::new(
1299            AccountState::new(
1300                AccountId::from("SIM-001"),
1301                account_type,
1302                vec![AccountBalance::new(
1303                    Money::from("1000 USD"),
1304                    Money::from("0 USD"),
1305                    Money::from("1000 USD"),
1306                )],
1307                vec![],
1308                false,
1309                UUID4::default(),
1310                UnixNanos::default(),
1311                UnixNanos::default(),
1312                None,
1313            ),
1314            false,
1315        );
1316        let () = cache
1317            .add_account(AccountAny::Margin(margin_account))
1318            .unwrap();
1319        // build indexes
1320        cache.build_index();
1321
1322        let exchange = get_exchange(
1323            Venue::new("SIM"),
1324            account_type,
1325            BookType::L2_MBP,
1326            Some(Rc::new(RefCell::new(cache))),
1327        );
1328        exchange.borrow_mut().initialize_account();
1329
1330        // Test adjust account, increase balance by 500 USD
1331        exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1332
1333        // Check if we received two messages, one for initial account state and one for adjusted account state
1334        let messages = get_saved_messages::<AccountState>(handler);
1335        assert_eq!(messages.len(), 2);
1336        let account_state_first = messages.first().unwrap();
1337        let account_state_second = messages.last().unwrap();
1338
1339        assert_eq!(account_state_first.balances.len(), 1);
1340        let current_balance = account_state_first.balances[0];
1341        assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1342        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1343        assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1344
1345        assert_eq!(account_state_second.balances.len(), 1);
1346        let current_balance = account_state_second.balances[0];
1347        assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1348        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1349        assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1350    }
1351
1352    #[rstest]
1353    fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1354        // Create 3 inflight commands with different timestamps and counters
1355        let inflight1 = InflightCommand::new(
1356            UnixNanos::from(100),
1357            1,
1358            create_submit_order_command(UnixNanos::from(100)),
1359        );
1360        let inflight2 = InflightCommand::new(
1361            UnixNanos::from(200),
1362            2,
1363            create_submit_order_command(UnixNanos::from(200)),
1364        );
1365        let inflight3 = InflightCommand::new(
1366            UnixNanos::from(100),
1367            2,
1368            create_submit_order_command(UnixNanos::from(100)),
1369        );
1370
1371        // Create a binary heap and push the inflight commands
1372        let mut inflight_heap = BinaryHeap::new();
1373        inflight_heap.push(inflight1);
1374        inflight_heap.push(inflight2);
1375        inflight_heap.push(inflight3);
1376
1377        // Pop the inflight commands and check if they are in the correct order
1378        // by our custom ordering with counter and timestamp
1379        let first = inflight_heap.pop().unwrap();
1380        let second = inflight_heap.pop().unwrap();
1381        let third = inflight_heap.pop().unwrap();
1382
1383        assert_eq!(first.ts, UnixNanos::from(100));
1384        assert_eq!(first.counter, 1);
1385        assert_eq!(second.ts, UnixNanos::from(100));
1386        assert_eq!(second.counter, 2);
1387        assert_eq!(third.ts, UnixNanos::from(200));
1388        assert_eq!(third.counter, 2);
1389    }
1390
1391    #[rstest]
1392    fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1393        let exchange = get_exchange(
1394            Venue::new("BINANCE"),
1395            AccountType::Margin,
1396            BookType::L2_MBP,
1397            None,
1398        );
1399
1400        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1401        exchange.borrow_mut().add_instrument(instrument).unwrap();
1402
1403        let command1 = create_submit_order_command(UnixNanos::from(100));
1404        let command2 = create_submit_order_command(UnixNanos::from(200));
1405
1406        exchange.borrow_mut().send(command1);
1407        exchange.borrow_mut().send(command2);
1408
1409        // Verify that message queue has 2 commands and inflight queue is empty
1410        // as we are not using latency model
1411        assert_eq!(exchange.borrow().message_queue.len(), 2);
1412        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1413
1414        // Process command and check that queues is empty
1415        exchange.borrow_mut().process(UnixNanos::from(300));
1416        assert_eq!(exchange.borrow().message_queue.len(), 0);
1417        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1418    }
1419
1420    #[rstest]
1421    fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1422        let latency_model = LatencyModel::new(
1423            UnixNanos::from(100),
1424            UnixNanos::from(200),
1425            UnixNanos::from(300),
1426            UnixNanos::from(100),
1427        );
1428        let exchange = get_exchange(
1429            Venue::new("BINANCE"),
1430            AccountType::Margin,
1431            BookType::L2_MBP,
1432            None,
1433        );
1434        exchange.borrow_mut().set_latency_model(latency_model);
1435
1436        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1437        exchange.borrow_mut().add_instrument(instrument).unwrap();
1438
1439        let command1 = create_submit_order_command(UnixNanos::from(100));
1440        let command2 = create_submit_order_command(UnixNanos::from(150));
1441        exchange.borrow_mut().send(command1);
1442        exchange.borrow_mut().send(command2);
1443
1444        // Verify that inflight queue has 2 commands and message queue is empty
1445        assert_eq!(exchange.borrow().message_queue.len(), 0);
1446        assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1447        // First inflight command should have timestamp at 100 and 200 insert latency
1448        assert_eq!(
1449            exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1450            UnixNanos::from(300)
1451        );
1452        // Second inflight command should have timestamp at 150 and 200 insert latency
1453        assert_eq!(
1454            exchange.borrow().inflight_queue.iter().nth(1).unwrap().ts,
1455            UnixNanos::from(350)
1456        );
1457
1458        // Process at timestamp 350, and test that only first command is processed
1459        exchange.borrow_mut().process(UnixNanos::from(320));
1460        assert_eq!(exchange.borrow().message_queue.len(), 0);
1461        assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1462        assert_eq!(
1463            exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1464            UnixNanos::from(350)
1465        );
1466    }
1467}