nautilus_backtest/
exchange.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `SimulatedExchange` venue for backtesting on historical data.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23    cell::RefCell,
24    collections::{BinaryHeap, HashMap, VecDeque},
25    fmt::Debug,
26    rc::Rc,
27};
28
29use nautilus_common::{cache::Cache, clock::Clock, messages::execution::TradingCommand};
30use nautilus_core::{
31    UnixNanos,
32    correctness::{FAILED, check_equal},
33};
34use nautilus_execution::{
35    client::ExecutionClient,
36    matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
37    models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
38};
39use nautilus_model::{
40    accounts::AccountAny,
41    data::{
42        Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
43        QuoteTick, TradeTick,
44    },
45    enums::{AccountType, BookType, OmsType},
46    identifiers::{InstrumentId, Venue},
47    instruments::{Instrument, InstrumentAny},
48    orderbook::OrderBook,
49    orders::PassiveOrderAny,
50    types::{AccountBalance, Currency, Money, Price},
51};
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53
54use crate::modules::SimulationModule;
55
56/// Represents commands with simulated network latency in a min-heap priority queue.
57/// The commands are ordered by timestamp for FIFO processing, with the
58/// earliest timestamp having the highest priority in the queue.
59#[derive(Debug, Eq, PartialEq)]
60struct InflightCommand {
61    ts: UnixNanos,
62    counter: u32,
63    command: TradingCommand,
64}
65
66impl InflightCommand {
67    const fn new(ts: UnixNanos, counter: u32, command: TradingCommand) -> Self {
68        Self {
69            ts,
70            counter,
71            command,
72        }
73    }
74}
75
76impl Ord for InflightCommand {
77    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78        // Reverse ordering for min-heap (earliest timestamp first then lowest counter)
79        other
80            .ts
81            .cmp(&self.ts)
82            .then_with(|| other.counter.cmp(&self.counter))
83    }
84}
85
86impl PartialOrd for InflightCommand {
87    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92/// Simulated exchange venue for realistic trading execution during backtesting.
93///
94/// The `SimulatedExchange` provides a comprehensive simulation of a trading venue,
95/// including order matching engines, account management, and realistic execution
96/// models. It maintains order books, processes market data, and executes trades
97/// with configurable latency and fill models to accurately simulate real market
98/// conditions during backtesting.
99///
100/// Key features:
101/// - Multi-instrument order matching with realistic execution
102/// - Configurable fee, fill, and latency models
103/// - Support for various order types and execution options
104/// - Account balance and position management
105/// - Market data processing and order book maintenance
106/// - Simulation modules for custom venue behaviors
107pub struct SimulatedExchange {
108    pub id: Venue,
109    pub oms_type: OmsType,
110    pub account_type: AccountType,
111    starting_balances: Vec<Money>,
112    book_type: BookType,
113    default_leverage: Decimal,
114    exec_client: Option<Rc<dyn ExecutionClient>>,
115    pub base_currency: Option<Currency>,
116    fee_model: FeeModelAny,
117    fill_model: FillModel,
118    latency_model: Option<LatencyModel>,
119    instruments: HashMap<InstrumentId, InstrumentAny>,
120    matching_engines: HashMap<InstrumentId, OrderMatchingEngine>,
121    leverages: HashMap<InstrumentId, Decimal>,
122    modules: Vec<Box<dyn SimulationModule>>,
123    clock: Rc<RefCell<dyn Clock>>,
124    cache: Rc<RefCell<Cache>>,
125    message_queue: VecDeque<TradingCommand>,
126    inflight_queue: BinaryHeap<InflightCommand>,
127    inflight_counter: HashMap<UnixNanos, u32>,
128    bar_execution: bool,
129    reject_stop_orders: bool,
130    support_gtd_orders: bool,
131    support_contingent_orders: bool,
132    use_position_ids: bool,
133    use_random_ids: bool,
134    use_reduce_only: bool,
135    use_message_queue: bool,
136    allow_cash_borrowing: bool,
137    frozen_account: bool,
138}
139
140impl Debug for SimulatedExchange {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct(stringify!(SimulatedExchange))
143            .field("id", &self.id)
144            .field("account_type", &self.account_type)
145            .finish()
146    }
147}
148
149impl SimulatedExchange {
150    /// Creates a new [`SimulatedExchange`] instance.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if:
155    /// - `starting_balances` is empty.
156    /// - `base_currency` is `Some` but `starting_balances` contains multiple currencies.
157    #[allow(clippy::too_many_arguments)]
158    pub fn new(
159        venue: Venue,
160        oms_type: OmsType,
161        account_type: AccountType,
162        starting_balances: Vec<Money>,
163        base_currency: Option<Currency>,
164        default_leverage: Decimal,
165        leverages: HashMap<InstrumentId, Decimal>,
166        modules: Vec<Box<dyn SimulationModule>>,
167        cache: Rc<RefCell<Cache>>,
168        clock: Rc<RefCell<dyn Clock>>,
169        fill_model: FillModel,
170        fee_model: FeeModelAny,
171        book_type: BookType,
172        latency_model: Option<LatencyModel>,
173        bar_execution: Option<bool>,
174        reject_stop_orders: Option<bool>,
175        support_gtd_orders: Option<bool>,
176        support_contingent_orders: Option<bool>,
177        use_position_ids: Option<bool>,
178        use_random_ids: Option<bool>,
179        use_reduce_only: Option<bool>,
180        use_message_queue: Option<bool>,
181        allow_cash_borrowing: Option<bool>,
182        frozen_account: Option<bool>,
183    ) -> anyhow::Result<Self> {
184        if starting_balances.is_empty() {
185            anyhow::bail!("Starting balances must be provided")
186        }
187        if base_currency.is_some() && starting_balances.len() > 1 {
188            anyhow::bail!("single-currency account has multiple starting currencies")
189        }
190        // TODO register and load modules
191        Ok(Self {
192            id: venue,
193            oms_type,
194            account_type,
195            starting_balances,
196            book_type,
197            default_leverage,
198            exec_client: None,
199            base_currency,
200            fee_model,
201            fill_model,
202            latency_model,
203            instruments: HashMap::new(),
204            matching_engines: HashMap::new(),
205            leverages,
206            modules,
207            clock,
208            cache,
209            message_queue: VecDeque::new(),
210            inflight_queue: BinaryHeap::new(),
211            inflight_counter: HashMap::new(),
212            bar_execution: bar_execution.unwrap_or(true),
213            reject_stop_orders: reject_stop_orders.unwrap_or(true),
214            support_gtd_orders: support_gtd_orders.unwrap_or(true),
215            support_contingent_orders: support_contingent_orders.unwrap_or(true),
216            use_position_ids: use_position_ids.unwrap_or(true),
217            use_random_ids: use_random_ids.unwrap_or(false),
218            use_reduce_only: use_reduce_only.unwrap_or(true),
219            use_message_queue: use_message_queue.unwrap_or(true),
220            allow_cash_borrowing: allow_cash_borrowing.unwrap_or(false),
221            frozen_account: frozen_account.unwrap_or(false),
222        })
223    }
224
225    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
226        self.exec_client = Some(client);
227    }
228
229    pub fn set_fill_model(&mut self, fill_model: FillModel) {
230        for matching_engine in self.matching_engines.values_mut() {
231            matching_engine.set_fill_model(fill_model.clone());
232            log::info!(
233                "Setting fill model for {} to {}",
234                matching_engine.venue,
235                self.fill_model
236            );
237        }
238        self.fill_model = fill_model;
239    }
240
241    pub const fn set_latency_model(&mut self, latency_model: LatencyModel) {
242        self.latency_model = Some(latency_model);
243    }
244
245    pub fn initialize_account(&mut self) {
246        self.generate_fresh_account_state();
247    }
248
249    /// Adds an instrument to the simulated exchange and initializes its matching engine.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if the exchange account type is `Cash` and the instrument is a `CryptoPerpetual` or `CryptoFuture`.
254    ///
255    /// # Panics
256    ///
257    /// Panics if the instrument cannot be added to the exchange.
258    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
259        check_equal(
260            &instrument.id().venue,
261            &self.id,
262            "Venue of instrument id",
263            "Venue of simulated exchange",
264        )
265        .expect(FAILED);
266
267        if self.account_type == AccountType::Cash
268            && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
269                || matches!(instrument, InstrumentAny::CryptoFuture(_)))
270        {
271            anyhow::bail!("Cash account cannot trade futures or perpetuals")
272        }
273
274        self.instruments.insert(instrument.id(), instrument.clone());
275
276        let matching_engine_config = OrderMatchingEngineConfig::new(
277            self.bar_execution,
278            self.reject_stop_orders,
279            self.support_gtd_orders,
280            self.support_contingent_orders,
281            self.use_position_ids,
282            self.use_random_ids,
283            self.use_reduce_only,
284        );
285        let instrument_id = instrument.id();
286        let matching_engine = OrderMatchingEngine::new(
287            instrument,
288            self.instruments.len() as u32,
289            self.fill_model.clone(),
290            self.fee_model.clone(),
291            self.book_type,
292            self.oms_type,
293            self.account_type,
294            self.clock.clone(),
295            Rc::clone(&self.cache),
296            matching_engine_config,
297        );
298        self.matching_engines.insert(instrument_id, matching_engine);
299
300        log::info!("Added instrument {instrument_id} and created matching engine");
301        Ok(())
302    }
303
304    #[must_use]
305    pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
306        self.matching_engines
307            .get(&instrument_id)
308            .and_then(OrderMatchingEngine::best_bid_price)
309    }
310
311    #[must_use]
312    pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
313        self.matching_engines
314            .get(&instrument_id)
315            .and_then(OrderMatchingEngine::best_ask_price)
316    }
317
318    pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
319        self.matching_engines
320            .get(&instrument_id)
321            .map(OrderMatchingEngine::get_book)
322    }
323
324    #[must_use]
325    pub fn get_matching_engine(
326        &self,
327        instrument_id: &InstrumentId,
328    ) -> Option<&OrderMatchingEngine> {
329        self.matching_engines.get(instrument_id)
330    }
331
332    #[must_use]
333    pub const fn get_matching_engines(&self) -> &HashMap<InstrumentId, OrderMatchingEngine> {
334        &self.matching_engines
335    }
336
337    #[must_use]
338    pub fn get_books(&self) -> HashMap<InstrumentId, OrderBook> {
339        let mut books = HashMap::new();
340        for (instrument_id, matching_engine) in &self.matching_engines {
341            books.insert(*instrument_id, matching_engine.get_book().clone());
342        }
343        books
344    }
345
346    #[must_use]
347    pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
348        instrument_id
349            .and_then(|id| {
350                self.matching_engines
351                    .get(&id)
352                    .map(OrderMatchingEngine::get_open_orders)
353            })
354            .unwrap_or_else(|| {
355                self.matching_engines
356                    .values()
357                    .flat_map(OrderMatchingEngine::get_open_orders)
358                    .collect()
359            })
360    }
361
362    #[must_use]
363    pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
364        instrument_id
365            .and_then(|id| {
366                self.matching_engines
367                    .get(&id)
368                    .map(|engine| engine.get_open_bid_orders().to_vec())
369            })
370            .unwrap_or_else(|| {
371                self.matching_engines
372                    .values()
373                    .flat_map(|engine| engine.get_open_bid_orders().to_vec())
374                    .collect()
375            })
376    }
377
378    #[must_use]
379    pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
380        instrument_id
381            .and_then(|id| {
382                self.matching_engines
383                    .get(&id)
384                    .map(|engine| engine.get_open_ask_orders().to_vec())
385            })
386            .unwrap_or_else(|| {
387                self.matching_engines
388                    .values()
389                    .flat_map(|engine| engine.get_open_ask_orders().to_vec())
390                    .collect()
391            })
392    }
393
394    /// # Panics
395    ///
396    /// Panics if retrieving the account from the execution client fails.
397    #[must_use]
398    pub fn get_account(&self) -> Option<AccountAny> {
399        self.exec_client
400            .as_ref()
401            .map(|client| client.get_account().unwrap())
402    }
403
404    /// # Panics
405    ///
406    /// Panics if generating account state fails during adjustment.
407    pub fn adjust_account(&mut self, adjustment: Money) {
408        if self.frozen_account {
409            // Nothing to adjust
410            return;
411        }
412
413        if let Some(exec_client) = &self.exec_client {
414            let venue = exec_client.venue();
415            println!("Adjusting account for venue {venue}");
416            if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
417                match account.balance(Some(adjustment.currency)) {
418                    Some(balance) => {
419                        let mut current_balance = *balance;
420                        current_balance.total += adjustment;
421                        current_balance.free += adjustment;
422
423                        let margins = match account {
424                            AccountAny::Margin(margin_account) => margin_account.margins.clone(),
425                            _ => HashMap::new(),
426                        };
427
428                        if let Some(exec_client) = &self.exec_client {
429                            exec_client
430                                .generate_account_state(
431                                    vec![current_balance],
432                                    margins.values().copied().collect(),
433                                    true,
434                                    self.clock.borrow().timestamp_ns(),
435                                )
436                                .unwrap();
437                        }
438                    }
439                    None => {
440                        log::error!(
441                            "Cannot adjust account: no balance for currency {}",
442                            adjustment.currency
443                        );
444                    }
445                }
446            } else {
447                log::error!("Cannot adjust account: no account for venue {venue}");
448            }
449        }
450    }
451
452    pub fn send(&mut self, command: TradingCommand) {
453        if !self.use_message_queue {
454            self.process_trading_command(command);
455        } else if self.latency_model.is_none() {
456            self.message_queue.push_back(command);
457        } else {
458            let (ts, counter) = self.generate_inflight_command(&command);
459            self.inflight_queue
460                .push(InflightCommand::new(ts, counter, command));
461        }
462    }
463
464    /// # Panics
465    ///
466    /// Panics if the command is invalid when generating inflight command.
467    pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
468        if let Some(latency_model) = &self.latency_model {
469            let ts = match command {
470                TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
471                    command.ts_init() + latency_model.insert_latency_nanos
472                }
473                TradingCommand::ModifyOrder(_) => {
474                    command.ts_init() + latency_model.update_latency_nanos
475                }
476                TradingCommand::CancelOrder(_)
477                | TradingCommand::CancelAllOrders(_)
478                | TradingCommand::BatchCancelOrders(_) => {
479                    command.ts_init() + latency_model.delete_latency_nanos
480                }
481                _ => panic!("Cannot handle command: {command:?}"),
482            };
483
484            let counter = self
485                .inflight_counter
486                .entry(ts)
487                .and_modify(|e| *e += 1)
488                .or_insert(1);
489
490            (ts, *counter)
491        } else {
492            panic!("Latency model should be initialized");
493        }
494    }
495
496    /// # Panics
497    ///
498    /// Panics if adding a missing instrument during delta processing fails.
499    pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
500        for module in &self.modules {
501            module.pre_process(Data::Delta(delta));
502        }
503
504        if !self.matching_engines.contains_key(&delta.instrument_id) {
505            let instrument = {
506                let cache = self.cache.as_ref().borrow();
507                cache.instrument(&delta.instrument_id).cloned()
508            };
509
510            if let Some(instrument) = instrument {
511                self.add_instrument(instrument).unwrap();
512            } else {
513                panic!(
514                    "No matching engine found for instrument {}",
515                    delta.instrument_id
516                );
517            }
518        }
519
520        if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
521            matching_engine.process_order_book_delta(&delta);
522        } else {
523            panic!("Matching engine should be initialized");
524        }
525    }
526
527    /// # Panics
528    ///
529    /// Panics if adding a missing instrument during deltas processing fails.
530    pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
531        for module in &self.modules {
532            module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
533        }
534
535        if !self.matching_engines.contains_key(&deltas.instrument_id) {
536            let instrument = {
537                let cache = self.cache.as_ref().borrow();
538                cache.instrument(&deltas.instrument_id).cloned()
539            };
540
541            if let Some(instrument) = instrument {
542                self.add_instrument(instrument).unwrap();
543            } else {
544                panic!(
545                    "No matching engine found for instrument {}",
546                    deltas.instrument_id
547                );
548            }
549        }
550
551        if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
552            matching_engine.process_order_book_deltas(&deltas);
553        } else {
554            panic!("Matching engine should be initialized");
555        }
556    }
557
558    /// # Panics
559    ///
560    /// Panics if adding a missing instrument during quote tick processing fails.
561    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
562        for module in &self.modules {
563            module.pre_process(Data::Quote(quote.to_owned()));
564        }
565
566        if !self.matching_engines.contains_key(&quote.instrument_id) {
567            let instrument = {
568                let cache = self.cache.as_ref().borrow();
569                cache.instrument(&quote.instrument_id).cloned()
570            };
571
572            if let Some(instrument) = instrument {
573                self.add_instrument(instrument).unwrap();
574            } else {
575                panic!(
576                    "No matching engine found for instrument {}",
577                    quote.instrument_id
578                );
579            }
580        }
581
582        if let Some(matching_engine) = self.matching_engines.get_mut(&quote.instrument_id) {
583            matching_engine.process_quote_tick(quote);
584        } else {
585            panic!("Matching engine should be initialized");
586        }
587    }
588
589    /// # Panics
590    ///
591    /// Panics if adding a missing instrument during trade tick processing fails.
592    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
593        for module in &self.modules {
594            module.pre_process(Data::Trade(trade.to_owned()));
595        }
596
597        if !self.matching_engines.contains_key(&trade.instrument_id) {
598            let instrument = {
599                let cache = self.cache.as_ref().borrow();
600                cache.instrument(&trade.instrument_id).cloned()
601            };
602
603            if let Some(instrument) = instrument {
604                self.add_instrument(instrument).unwrap();
605            } else {
606                panic!(
607                    "No matching engine found for instrument {}",
608                    trade.instrument_id
609                );
610            }
611        }
612
613        if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
614            matching_engine.process_trade_tick(trade);
615        } else {
616            panic!("Matching engine should be initialized");
617        }
618    }
619
620    /// # Panics
621    ///
622    /// Panics if adding a missing instrument during bar processing fails.
623    pub fn process_bar(&mut self, bar: Bar) {
624        for module in &self.modules {
625            module.pre_process(Data::Bar(bar));
626        }
627
628        if !self.matching_engines.contains_key(&bar.instrument_id()) {
629            let instrument = {
630                let cache = self.cache.as_ref().borrow();
631                cache.instrument(&bar.instrument_id()).cloned()
632            };
633
634            if let Some(instrument) = instrument {
635                self.add_instrument(instrument).unwrap();
636            } else {
637                panic!(
638                    "No matching engine found for instrument {}",
639                    bar.instrument_id()
640                );
641            }
642        }
643
644        if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
645            matching_engine.process_bar(&bar);
646        } else {
647            panic!("Matching engine should be initialized");
648        }
649    }
650
651    /// # Panics
652    ///
653    /// Panics if adding a missing instrument during instrument status processing fails.
654    pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
655        // TODO add module preprocessing
656
657        if !self.matching_engines.contains_key(&status.instrument_id) {
658            let instrument = {
659                let cache = self.cache.as_ref().borrow();
660                cache.instrument(&status.instrument_id).cloned()
661            };
662
663            if let Some(instrument) = instrument {
664                self.add_instrument(instrument).unwrap();
665            } else {
666                panic!(
667                    "No matching engine found for instrument {}",
668                    status.instrument_id
669                );
670            }
671        }
672
673        if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
674            matching_engine.process_status(status.action);
675        } else {
676            panic!("Matching engine should be initialized");
677        }
678    }
679
680    /// # Panics
681    ///
682    /// Panics if popping an inflight command fails during processing.
683    pub fn process(&mut self, ts_now: UnixNanos) {
684        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
685
686        // Process inflight commands
687        while let Some(inflight) = self.inflight_queue.peek() {
688            if inflight.ts > ts_now {
689                // Future commands remain in the queue
690                break;
691            }
692            // We get the inflight command, remove it from the queue and process it
693            let inflight = self.inflight_queue.pop().unwrap();
694            self.process_trading_command(inflight.command);
695        }
696
697        // Process regular message queue
698        while let Some(command) = self.message_queue.pop_front() {
699            self.process_trading_command(command);
700        }
701    }
702
703    pub fn reset(&mut self) {
704        for module in &self.modules {
705            module.reset();
706        }
707
708        self.generate_fresh_account_state();
709
710        for matching_engine in self.matching_engines.values_mut() {
711            matching_engine.reset();
712        }
713
714        // TODO Clear the inflight and message queues
715        log::info!("Resetting exchange state");
716    }
717
718    /// # Panics
719    ///
720    /// Panics if execution client is uninitialized when processing trading command.
721    pub fn process_trading_command(&mut self, command: TradingCommand) {
722        if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
723            let account_id = if let Some(exec_client) = &self.exec_client {
724                exec_client.account_id()
725            } else {
726                panic!("Execution client should be initialized");
727            };
728            match command {
729                TradingCommand::SubmitOrder(mut command) => {
730                    matching_engine.process_order(&mut command.order, account_id);
731                }
732                TradingCommand::ModifyOrder(ref command) => {
733                    matching_engine.process_modify(command, account_id);
734                }
735                TradingCommand::CancelOrder(ref command) => {
736                    matching_engine.process_cancel(command, account_id);
737                }
738                TradingCommand::CancelAllOrders(ref command) => {
739                    matching_engine.process_cancel_all(command, account_id);
740                }
741                TradingCommand::BatchCancelOrders(ref command) => {
742                    matching_engine.process_batch_cancel(command, account_id);
743                }
744                TradingCommand::SubmitOrderList(mut command) => {
745                    for order in &mut command.order_list.orders {
746                        matching_engine.process_order(order, account_id);
747                    }
748                }
749                _ => {}
750            }
751        } else {
752            panic!(
753                "Matching engine not found for instrument {}",
754                command.instrument_id()
755            );
756        }
757    }
758
759    /// # Panics
760    ///
761    /// Panics if generating fresh account state fails.
762    pub fn generate_fresh_account_state(&self) {
763        let balances: Vec<AccountBalance> = self
764            .starting_balances
765            .iter()
766            .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
767            .collect();
768
769        if let Some(exec_client) = &self.exec_client {
770            exec_client
771                .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
772                .unwrap();
773        }
774
775        // Set leverages
776        if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
777            margin_account.set_default_leverage(self.default_leverage.to_f64().unwrap());
778
779            // Set instrument specific leverages
780            for (instrument_id, leverage) in &self.leverages {
781                margin_account.set_leverage(*instrument_id, leverage.to_f64().unwrap());
782            }
783        }
784    }
785}
786
787////////////////////////////////////////////////////////////////////////////////
788// Tests
789////////////////////////////////////////////////////////////////////////////////
790
791#[cfg(test)]
792mod tests {
793    use std::{
794        cell::RefCell,
795        collections::{BinaryHeap, HashMap},
796        rc::Rc,
797        sync::LazyLock,
798    };
799
800    use nautilus_common::{
801        cache::Cache,
802        clock::TestClock,
803        messages::execution::{SubmitOrder, TradingCommand},
804        msgbus::{
805            self,
806            stubs::{get_message_saving_handler, get_saved_messages},
807        },
808    };
809    use nautilus_core::{AtomicTime, UUID4, UnixNanos};
810    use nautilus_execution::models::{
811        fee::{FeeModelAny, MakerTakerFeeModel},
812        fill::FillModel,
813        latency::LatencyModel,
814    };
815    use nautilus_model::{
816        accounts::{AccountAny, MarginAccount},
817        data::{
818            Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
819            TradeTick,
820        },
821        enums::{
822            AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
823            OmsType, OrderSide, OrderType,
824        },
825        events::AccountState,
826        identifiers::{
827            AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
828            VenueOrderId,
829        },
830        instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
831        orders::OrderTestBuilder,
832        types::{AccountBalance, Currency, Money, Price, Quantity},
833    };
834    use rstest::rstest;
835
836    use crate::{
837        exchange::{InflightCommand, SimulatedExchange},
838        execution_client::BacktestExecutionClient,
839    };
840
841    static ATOMIC_TIME: LazyLock<AtomicTime> =
842        LazyLock::new(|| AtomicTime::new(true, UnixNanos::default()));
843
844    fn get_exchange(
845        venue: Venue,
846        account_type: AccountType,
847        book_type: BookType,
848        cache: Option<Rc<RefCell<Cache>>>,
849    ) -> Rc<RefCell<SimulatedExchange>> {
850        let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
851        let clock = Rc::new(RefCell::new(TestClock::new()));
852        let exchange = Rc::new(RefCell::new(
853            SimulatedExchange::new(
854                venue,
855                OmsType::Netting,
856                account_type,
857                vec![Money::new(1000.0, Currency::USD())],
858                None,
859                1.into(),
860                HashMap::new(),
861                vec![],
862                cache.clone(),
863                clock,
864                FillModel::default(),
865                FeeModelAny::MakerTaker(MakerTakerFeeModel),
866                book_type,
867                None,
868                None,
869                None,
870                None,
871                None,
872                None,
873                None,
874                None,
875                None,
876                None,
877                None,
878            )
879            .unwrap(),
880        ));
881
882        let clock = TestClock::new();
883        let execution_client = BacktestExecutionClient::new(
884            TraderId::default(),
885            AccountId::default(),
886            exchange.clone(),
887            cache,
888            Rc::new(RefCell::new(clock)),
889            None,
890            None,
891        );
892        exchange
893            .borrow_mut()
894            .register_client(Rc::new(execution_client));
895
896        exchange
897    }
898
899    fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
900        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
901        let order = OrderTestBuilder::new(OrderType::Market)
902            .instrument_id(instrument_id)
903            .quantity(Quantity::from(1))
904            .build();
905        TradingCommand::SubmitOrder(
906            SubmitOrder::new(
907                TraderId::default(),
908                ClientId::default(),
909                StrategyId::default(),
910                instrument_id,
911                ClientOrderId::default(),
912                VenueOrderId::default(),
913                order,
914                None,
915                None,
916                UUID4::default(),
917                ts_init,
918            )
919            .unwrap(),
920        )
921    }
922
923    #[rstest]
924    #[should_panic(
925        expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
926    )]
927    fn test_venue_mismatch_between_exchange_and_instrument(
928        crypto_perpetual_ethusdt: CryptoPerpetual,
929    ) {
930        let exchange = get_exchange(
931            Venue::new("SIM"),
932            AccountType::Margin,
933            BookType::L1_MBP,
934            None,
935        );
936        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
937        exchange.borrow_mut().add_instrument(instrument).unwrap();
938    }
939
940    #[rstest]
941    #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
942    fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
943        let exchange = get_exchange(
944            Venue::new("BINANCE"),
945            AccountType::Cash,
946            BookType::L1_MBP,
947            None,
948        );
949        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
950        exchange.borrow_mut().add_instrument(instrument).unwrap();
951    }
952
953    #[rstest]
954    fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
955        let exchange = get_exchange(
956            Venue::new("BINANCE"),
957            AccountType::Margin,
958            BookType::L1_MBP,
959            None,
960        );
961        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
962
963        // register instrument
964        exchange.borrow_mut().add_instrument(instrument).unwrap();
965
966        // process tick
967        let quote_tick = QuoteTick::new(
968            crypto_perpetual_ethusdt.id,
969            Price::from("1000"),
970            Price::from("1001"),
971            Quantity::from(1),
972            Quantity::from(1),
973            UnixNanos::default(),
974            UnixNanos::default(),
975        );
976        exchange.borrow_mut().process_quote_tick(&quote_tick);
977
978        let best_bid_price = exchange
979            .borrow()
980            .best_bid_price(crypto_perpetual_ethusdt.id);
981        assert_eq!(best_bid_price, Some(Price::from("1000")));
982        let best_ask_price = exchange
983            .borrow()
984            .best_ask_price(crypto_perpetual_ethusdt.id);
985        assert_eq!(best_ask_price, Some(Price::from("1001")));
986    }
987
988    #[rstest]
989    fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
990        let exchange = get_exchange(
991            Venue::new("BINANCE"),
992            AccountType::Margin,
993            BookType::L1_MBP,
994            None,
995        );
996        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
997
998        // register instrument
999        exchange.borrow_mut().add_instrument(instrument).unwrap();
1000
1001        // process tick
1002        let trade_tick = TradeTick::new(
1003            crypto_perpetual_ethusdt.id,
1004            Price::from("1000"),
1005            Quantity::from(1),
1006            AggressorSide::Buyer,
1007            TradeId::from("1"),
1008            UnixNanos::default(),
1009            UnixNanos::default(),
1010        );
1011        exchange.borrow_mut().process_trade_tick(&trade_tick);
1012
1013        let best_bid_price = exchange
1014            .borrow()
1015            .best_bid_price(crypto_perpetual_ethusdt.id);
1016        assert_eq!(best_bid_price, Some(Price::from("1000")));
1017        let best_ask = exchange
1018            .borrow()
1019            .best_ask_price(crypto_perpetual_ethusdt.id);
1020        assert_eq!(best_ask, Some(Price::from("1000")));
1021    }
1022
1023    #[rstest]
1024    fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1025        let exchange = get_exchange(
1026            Venue::new("BINANCE"),
1027            AccountType::Margin,
1028            BookType::L1_MBP,
1029            None,
1030        );
1031        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1032
1033        // register instrument
1034        exchange.borrow_mut().add_instrument(instrument).unwrap();
1035
1036        // process bar
1037        let bar = Bar::new(
1038            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1039            Price::from("1500.00"),
1040            Price::from("1505.00"),
1041            Price::from("1490.00"),
1042            Price::from("1502.00"),
1043            Quantity::from(100),
1044            UnixNanos::default(),
1045            UnixNanos::default(),
1046        );
1047        exchange.borrow_mut().process_bar(bar);
1048
1049        // this will be processed as ticks so both bid and ask will be the same as close of the bar
1050        let best_bid_price = exchange
1051            .borrow()
1052            .best_bid_price(crypto_perpetual_ethusdt.id);
1053        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1054        let best_ask_price = exchange
1055            .borrow()
1056            .best_ask_price(crypto_perpetual_ethusdt.id);
1057        assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1058    }
1059
1060    #[rstest]
1061    fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1062        let exchange = get_exchange(
1063            Venue::new("BINANCE"),
1064            AccountType::Margin,
1065            BookType::L1_MBP,
1066            None,
1067        );
1068        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1069
1070        // register instrument
1071        exchange.borrow_mut().add_instrument(instrument).unwrap();
1072
1073        // create both bid and ask based bars
1074        // add +1 on ask to make sure it is different from bid
1075        let bar_bid = Bar::new(
1076            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1077            Price::from("1500.00"),
1078            Price::from("1505.00"),
1079            Price::from("1490.00"),
1080            Price::from("1502.00"),
1081            Quantity::from(100),
1082            UnixNanos::from(1),
1083            UnixNanos::from(1),
1084        );
1085        let bar_ask = Bar::new(
1086            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1087            Price::from("1501.00"),
1088            Price::from("1506.00"),
1089            Price::from("1491.00"),
1090            Price::from("1503.00"),
1091            Quantity::from(100),
1092            UnixNanos::from(1),
1093            UnixNanos::from(1),
1094        );
1095
1096        // process them
1097        exchange.borrow_mut().process_bar(bar_bid);
1098        exchange.borrow_mut().process_bar(bar_ask);
1099
1100        // current bid and ask prices will be the corresponding close of the ask and bid bar
1101        let best_bid_price = exchange
1102            .borrow()
1103            .best_bid_price(crypto_perpetual_ethusdt.id);
1104        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1105        let best_ask_price = exchange
1106            .borrow()
1107            .best_ask_price(crypto_perpetual_ethusdt.id);
1108        assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1109    }
1110
1111    #[rstest]
1112    fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1113        let exchange = get_exchange(
1114            Venue::new("BINANCE"),
1115            AccountType::Margin,
1116            BookType::L2_MBP,
1117            None,
1118        );
1119        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1120
1121        // register instrument
1122        exchange.borrow_mut().add_instrument(instrument).unwrap();
1123
1124        // create order book delta at both bid and ask with incremented ts init and sequence
1125        let delta_buy = OrderBookDelta::new(
1126            crypto_perpetual_ethusdt.id,
1127            BookAction::Add,
1128            BookOrder::new(OrderSide::Buy, Price::from("1000.00"), Quantity::from(1), 1),
1129            0,
1130            0,
1131            UnixNanos::from(1),
1132            UnixNanos::from(1),
1133        );
1134        let delta_sell = OrderBookDelta::new(
1135            crypto_perpetual_ethusdt.id,
1136            BookAction::Add,
1137            BookOrder::new(
1138                OrderSide::Sell,
1139                Price::from("1001.00"),
1140                Quantity::from(1),
1141                1,
1142            ),
1143            0,
1144            1,
1145            UnixNanos::from(2),
1146            UnixNanos::from(2),
1147        );
1148
1149        // process both deltas
1150        exchange.borrow_mut().process_order_book_delta(delta_buy);
1151        exchange.borrow_mut().process_order_book_delta(delta_sell);
1152
1153        let book = exchange
1154            .borrow()
1155            .get_book(crypto_perpetual_ethusdt.id)
1156            .unwrap()
1157            .clone();
1158        assert_eq!(book.update_count, 2);
1159        assert_eq!(book.sequence, 1);
1160        assert_eq!(book.ts_last, UnixNanos::from(2));
1161        let best_bid_price = exchange
1162            .borrow()
1163            .best_bid_price(crypto_perpetual_ethusdt.id);
1164        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1165        let best_ask_price = exchange
1166            .borrow()
1167            .best_ask_price(crypto_perpetual_ethusdt.id);
1168        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1169    }
1170
1171    #[rstest]
1172    fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1173        let exchange = get_exchange(
1174            Venue::new("BINANCE"),
1175            AccountType::Margin,
1176            BookType::L2_MBP,
1177            None,
1178        );
1179        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1180
1181        // register instrument
1182        exchange.borrow_mut().add_instrument(instrument).unwrap();
1183
1184        // create two sell order book deltas with same timestamps and higher sequence
1185        let delta_sell_1 = OrderBookDelta::new(
1186            crypto_perpetual_ethusdt.id,
1187            BookAction::Add,
1188            BookOrder::new(
1189                OrderSide::Sell,
1190                Price::from("1000.00"),
1191                Quantity::from(3),
1192                1,
1193            ),
1194            0,
1195            0,
1196            UnixNanos::from(1),
1197            UnixNanos::from(1),
1198        );
1199        let delta_sell_2 = OrderBookDelta::new(
1200            crypto_perpetual_ethusdt.id,
1201            BookAction::Add,
1202            BookOrder::new(
1203                OrderSide::Sell,
1204                Price::from("1001.00"),
1205                Quantity::from(1),
1206                1,
1207            ),
1208            0,
1209            1,
1210            UnixNanos::from(1),
1211            UnixNanos::from(1),
1212        );
1213        let orderbook_deltas = OrderBookDeltas::new(
1214            crypto_perpetual_ethusdt.id,
1215            vec![delta_sell_1, delta_sell_2],
1216        );
1217
1218        // process both deltas
1219        exchange
1220            .borrow_mut()
1221            .process_order_book_deltas(orderbook_deltas);
1222
1223        let book = exchange
1224            .borrow()
1225            .get_book(crypto_perpetual_ethusdt.id)
1226            .unwrap()
1227            .clone();
1228        assert_eq!(book.update_count, 2);
1229        assert_eq!(book.sequence, 1);
1230        assert_eq!(book.ts_last, UnixNanos::from(1));
1231        let best_bid_price = exchange
1232            .borrow()
1233            .best_bid_price(crypto_perpetual_ethusdt.id);
1234        // no bid orders in orderbook deltas
1235        assert_eq!(best_bid_price, None);
1236        let best_ask_price = exchange
1237            .borrow()
1238            .best_ask_price(crypto_perpetual_ethusdt.id);
1239        // best ask price is the first order in orderbook deltas
1240        assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1241    }
1242
1243    #[rstest]
1244    fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1245        let exchange = get_exchange(
1246            Venue::new("BINANCE"),
1247            AccountType::Margin,
1248            BookType::L2_MBP,
1249            None,
1250        );
1251        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1252
1253        // register instrument
1254        exchange.borrow_mut().add_instrument(instrument).unwrap();
1255
1256        let instrument_status = InstrumentStatus::new(
1257            crypto_perpetual_ethusdt.id,
1258            MarketStatusAction::Close, // close the market
1259            UnixNanos::from(1),
1260            UnixNanos::from(1),
1261            None,
1262            None,
1263            None,
1264            None,
1265            None,
1266        );
1267
1268        exchange
1269            .borrow_mut()
1270            .process_instrument_status(instrument_status);
1271
1272        let market_status = exchange
1273            .borrow()
1274            .get_matching_engine(&crypto_perpetual_ethusdt.id)
1275            .unwrap()
1276            .market_status;
1277        assert_eq!(market_status, MarketStatus::Closed);
1278    }
1279
1280    #[rstest]
1281    fn test_accounting() {
1282        let account_type = AccountType::Margin;
1283        let mut cache = Cache::default();
1284        let handler = get_message_saving_handler::<AccountState>(None);
1285        msgbus::register("Portfolio.update_account".into(), handler.clone());
1286        let margin_account = MarginAccount::new(
1287            AccountState::new(
1288                AccountId::from("SIM-001"),
1289                account_type,
1290                vec![AccountBalance::new(
1291                    Money::from("1000 USD"),
1292                    Money::from("0 USD"),
1293                    Money::from("1000 USD"),
1294                )],
1295                vec![],
1296                false,
1297                UUID4::default(),
1298                UnixNanos::default(),
1299                UnixNanos::default(),
1300                None,
1301            ),
1302            false,
1303        );
1304        let () = cache
1305            .add_account(AccountAny::Margin(margin_account))
1306            .unwrap();
1307        // build indexes
1308        cache.build_index();
1309
1310        let exchange = get_exchange(
1311            Venue::new("SIM"),
1312            account_type,
1313            BookType::L2_MBP,
1314            Some(Rc::new(RefCell::new(cache))),
1315        );
1316        exchange.borrow_mut().initialize_account();
1317
1318        // Test adjust account, increase balance by 500 USD
1319        exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1320
1321        // Check if we received two messages, one for initial account state and one for adjusted account state
1322        let messages = get_saved_messages::<AccountState>(handler);
1323        assert_eq!(messages.len(), 2);
1324        let account_state_first = messages.first().unwrap();
1325        let account_state_second = messages.last().unwrap();
1326
1327        assert_eq!(account_state_first.balances.len(), 1);
1328        let current_balance = account_state_first.balances[0];
1329        assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1330        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1331        assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1332
1333        assert_eq!(account_state_second.balances.len(), 1);
1334        let current_balance = account_state_second.balances[0];
1335        assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1336        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1337        assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1338    }
1339
1340    #[rstest]
1341    fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1342        // Create 3 inflight commands with different timestamps and counters
1343        let inflight1 = InflightCommand::new(
1344            UnixNanos::from(100),
1345            1,
1346            create_submit_order_command(UnixNanos::from(100)),
1347        );
1348        let inflight2 = InflightCommand::new(
1349            UnixNanos::from(200),
1350            2,
1351            create_submit_order_command(UnixNanos::from(200)),
1352        );
1353        let inflight3 = InflightCommand::new(
1354            UnixNanos::from(100),
1355            2,
1356            create_submit_order_command(UnixNanos::from(100)),
1357        );
1358
1359        // Create a binary heap and push the inflight commands
1360        let mut inflight_heap = BinaryHeap::new();
1361        inflight_heap.push(inflight1);
1362        inflight_heap.push(inflight2);
1363        inflight_heap.push(inflight3);
1364
1365        // Pop the inflight commands and check if they are in the correct order
1366        // by our custom ordering with counter and timestamp
1367        let first = inflight_heap.pop().unwrap();
1368        let second = inflight_heap.pop().unwrap();
1369        let third = inflight_heap.pop().unwrap();
1370
1371        assert_eq!(first.ts, UnixNanos::from(100));
1372        assert_eq!(first.counter, 1);
1373        assert_eq!(second.ts, UnixNanos::from(100));
1374        assert_eq!(second.counter, 2);
1375        assert_eq!(third.ts, UnixNanos::from(200));
1376        assert_eq!(third.counter, 2);
1377    }
1378
1379    #[rstest]
1380    fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1381        let exchange = get_exchange(
1382            Venue::new("BINANCE"),
1383            AccountType::Margin,
1384            BookType::L2_MBP,
1385            None,
1386        );
1387
1388        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1389        exchange.borrow_mut().add_instrument(instrument).unwrap();
1390
1391        let command1 = create_submit_order_command(UnixNanos::from(100));
1392        let command2 = create_submit_order_command(UnixNanos::from(200));
1393
1394        exchange.borrow_mut().send(command1);
1395        exchange.borrow_mut().send(command2);
1396
1397        // Verify that message queue has 2 commands and inflight queue is empty
1398        // as we are not using latency model
1399        assert_eq!(exchange.borrow().message_queue.len(), 2);
1400        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1401
1402        // Process command and check that queues is empty
1403        exchange.borrow_mut().process(UnixNanos::from(300));
1404        assert_eq!(exchange.borrow().message_queue.len(), 0);
1405        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1406    }
1407
1408    #[rstest]
1409    fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1410        let latency_model = LatencyModel::new(
1411            UnixNanos::from(100),
1412            UnixNanos::from(200),
1413            UnixNanos::from(300),
1414            UnixNanos::from(100),
1415        );
1416        let exchange = get_exchange(
1417            Venue::new("BINANCE"),
1418            AccountType::Margin,
1419            BookType::L2_MBP,
1420            None,
1421        );
1422        exchange.borrow_mut().set_latency_model(latency_model);
1423
1424        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1425        exchange.borrow_mut().add_instrument(instrument).unwrap();
1426
1427        let command1 = create_submit_order_command(UnixNanos::from(100));
1428        let command2 = create_submit_order_command(UnixNanos::from(150));
1429        exchange.borrow_mut().send(command1);
1430        exchange.borrow_mut().send(command2);
1431
1432        // Verify that inflight queue has 2 commands and message queue is empty
1433        assert_eq!(exchange.borrow().message_queue.len(), 0);
1434        assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1435        // First inflight command should have timestamp at 100 and 200 insert latency
1436        assert_eq!(
1437            exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1438            UnixNanos::from(300)
1439        );
1440        // Second inflight command should have timestamp at 150 and 200 insert latency
1441        assert_eq!(
1442            exchange.borrow().inflight_queue.iter().nth(1).unwrap().ts,
1443            UnixNanos::from(350)
1444        );
1445
1446        // Process at timestamp 350, and test that only first command is processed
1447        exchange.borrow_mut().process(UnixNanos::from(320));
1448        assert_eq!(exchange.borrow().message_queue.len(), 0);
1449        assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1450        assert_eq!(
1451            exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1452            UnixNanos::from(350)
1453        );
1454    }
1455}