Skip to main content

nautilus_backtest/
exchange.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `SimulatedExchange` venue for backtesting on historical data.
17
18use std::{
19    cell::RefCell,
20    collections::{BinaryHeap, VecDeque},
21    fmt::Debug,
22    rc::Rc,
23};
24
25use ahash::AHashMap;
26use nautilus_common::{
27    cache::Cache, clients::ExecutionClient, clock::Clock, messages::execution::TradingCommand,
28};
29use nautilus_core::{
30    UnixNanos,
31    correctness::{FAILED, check_equal},
32};
33use nautilus_execution::{
34    matching_core::OrderMatchInfo,
35    matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
36    models::{fee::FeeModelAny, fill::FillModelAny, latency::LatencyModel},
37};
38use nautilus_model::{
39    accounts::AccountAny,
40    data::{
41        Bar, Data, InstrumentClose, InstrumentStatus, OrderBookDelta, OrderBookDeltas,
42        OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick,
43    },
44    enums::{AccountType, BookType, OmsType},
45    identifiers::{InstrumentId, Venue},
46    instruments::{Instrument, InstrumentAny},
47    orderbook::OrderBook,
48    orders::OrderAny,
49    types::{AccountBalance, Currency, Money, Price},
50};
51use rust_decimal::Decimal;
52
53use crate::modules::SimulationModule;
54
55/// Represents commands with simulated network latency in a min-heap priority queue.
56/// The commands are ordered by timestamp for FIFO processing, with the
57/// earliest timestamp having the highest priority in the queue.
58#[derive(Debug, Eq, PartialEq)]
59struct InflightCommand {
60    timestamp: UnixNanos,
61    counter: u32,
62    command: TradingCommand,
63}
64
65impl InflightCommand {
66    const fn new(timestamp: UnixNanos, counter: u32, command: TradingCommand) -> Self {
67        Self {
68            timestamp,
69            counter,
70            command,
71        }
72    }
73}
74
75impl Ord for InflightCommand {
76    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
77        // Reverse ordering for min-heap (earliest timestamp first then lowest counter)
78        other
79            .timestamp
80            .cmp(&self.timestamp)
81            .then_with(|| other.counter.cmp(&self.counter))
82    }
83}
84
85impl PartialOrd for InflightCommand {
86    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
87        Some(self.cmp(other))
88    }
89}
90
91/// Simulated exchange venue for realistic trading execution during backtesting.
92///
93/// The `SimulatedExchange` provides a comprehensive simulation of a trading venue,
94/// including order matching engines, account management, and realistic execution
95/// models. It maintains order books, processes market data, and executes trades
96/// with configurable latency and fill models to accurately simulate real market
97/// conditions during backtesting.
98///
99/// Key features:
100/// - Multi-instrument order matching with realistic execution
101/// - Configurable fee, fill, and latency models
102/// - Support for various order types and execution options
103/// - Account balance and position management
104/// - Market data processing and order book maintenance
105/// - Simulation modules for custom venue behaviors
106pub struct SimulatedExchange {
107    pub id: Venue,
108    pub oms_type: OmsType,
109    pub account_type: AccountType,
110    starting_balances: Vec<Money>,
111    book_type: BookType,
112    default_leverage: Decimal,
113    exec_client: Option<Rc<dyn ExecutionClient>>,
114    pub base_currency: Option<Currency>,
115    fee_model: FeeModelAny,
116    fill_model: FillModelAny,
117    latency_model: Option<Box<dyn LatencyModel>>,
118    instruments: AHashMap<InstrumentId, InstrumentAny>,
119    matching_engines: AHashMap<InstrumentId, OrderMatchingEngine>,
120    leverages: AHashMap<InstrumentId, Decimal>,
121    modules: Vec<Box<dyn SimulationModule>>,
122    clock: Rc<RefCell<dyn Clock>>,
123    cache: Rc<RefCell<Cache>>,
124    message_queue: VecDeque<TradingCommand>,
125    inflight_queue: BinaryHeap<InflightCommand>,
126    inflight_counter: AHashMap<UnixNanos, u32>,
127    bar_execution: bool,
128    bar_adaptive_high_low_ordering: bool,
129    trade_execution: bool,
130    liquidity_consumption: bool,
131    reject_stop_orders: bool,
132    support_gtd_orders: bool,
133    support_contingent_orders: bool,
134    use_position_ids: bool,
135    use_random_ids: bool,
136    use_reduce_only: bool,
137    use_message_queue: bool,
138    use_market_order_acks: bool,
139    _allow_cash_borrowing: bool,
140    frozen_account: bool,
141    price_protection_points: u32,
142}
143
144impl Debug for SimulatedExchange {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct(stringify!(SimulatedExchange))
147            .field("id", &self.id)
148            .field("account_type", &self.account_type)
149            .finish()
150    }
151}
152
153impl SimulatedExchange {
154    /// Creates a new [`SimulatedExchange`] instance.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if:
159    /// - `starting_balances` is empty.
160    /// - `base_currency` is `Some` but `starting_balances` contains multiple currencies.
161    #[allow(clippy::too_many_arguments)]
162    pub fn new(
163        venue: Venue,
164        oms_type: OmsType,
165        account_type: AccountType,
166        starting_balances: Vec<Money>,
167        base_currency: Option<Currency>,
168        default_leverage: Decimal,
169        leverages: AHashMap<InstrumentId, Decimal>,
170        modules: Vec<Box<dyn SimulationModule>>,
171        cache: Rc<RefCell<Cache>>,
172        clock: Rc<RefCell<dyn Clock>>,
173        fill_model: FillModelAny,
174        fee_model: FeeModelAny,
175        book_type: BookType,
176        latency_model: Option<Box<dyn LatencyModel>>,
177        bar_execution: Option<bool>,
178        bar_adaptive_high_low_ordering: Option<bool>,
179        trade_execution: Option<bool>,
180        liquidity_consumption: Option<bool>,
181        reject_stop_orders: Option<bool>,
182        support_gtd_orders: Option<bool>,
183        support_contingent_orders: Option<bool>,
184        use_position_ids: Option<bool>,
185        use_random_ids: Option<bool>,
186        use_reduce_only: Option<bool>,
187        use_message_queue: Option<bool>,
188        use_market_order_acks: Option<bool>,
189        allow_cash_borrowing: Option<bool>,
190        frozen_account: Option<bool>,
191        price_protection_points: Option<u32>,
192    ) -> anyhow::Result<Self> {
193        if starting_balances.is_empty() {
194            anyhow::bail!("Starting balances must be provided")
195        }
196        if base_currency.is_some() && starting_balances.len() > 1 {
197            anyhow::bail!("single-currency account has multiple starting currencies")
198        }
199        // TODO register and load modules
200        Ok(Self {
201            id: venue,
202            oms_type,
203            account_type,
204            starting_balances,
205            book_type,
206            default_leverage,
207            exec_client: None,
208            base_currency,
209            fee_model,
210            fill_model,
211            latency_model,
212            instruments: AHashMap::new(),
213            matching_engines: AHashMap::new(),
214            leverages,
215            modules,
216            clock,
217            cache,
218            message_queue: VecDeque::new(),
219            inflight_queue: BinaryHeap::new(),
220            inflight_counter: AHashMap::new(),
221            bar_execution: bar_execution.unwrap_or(true),
222            bar_adaptive_high_low_ordering: bar_adaptive_high_low_ordering.unwrap_or(false),
223            trade_execution: trade_execution.unwrap_or(true),
224            liquidity_consumption: liquidity_consumption.unwrap_or(true),
225            reject_stop_orders: reject_stop_orders.unwrap_or(true),
226            support_gtd_orders: support_gtd_orders.unwrap_or(true),
227            support_contingent_orders: support_contingent_orders.unwrap_or(true),
228            use_position_ids: use_position_ids.unwrap_or(true),
229            use_random_ids: use_random_ids.unwrap_or(false),
230            use_reduce_only: use_reduce_only.unwrap_or(true),
231            use_message_queue: use_message_queue.unwrap_or(true),
232            use_market_order_acks: use_market_order_acks.unwrap_or(false),
233            _allow_cash_borrowing: allow_cash_borrowing.unwrap_or(false),
234            frozen_account: frozen_account.unwrap_or(false),
235            price_protection_points: price_protection_points.unwrap_or(0),
236        })
237    }
238
239    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
240        self.exec_client = Some(client);
241    }
242
243    pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
244        for matching_engine in self.matching_engines.values_mut() {
245            matching_engine.set_fill_model(fill_model.clone());
246            log::info!(
247                "Setting fill model for {} to {}",
248                matching_engine.venue,
249                self.fill_model
250            );
251        }
252        self.fill_model = fill_model;
253    }
254
255    pub fn set_latency_model(&mut self, latency_model: Box<dyn LatencyModel>) {
256        self.latency_model = Some(latency_model);
257    }
258
259    pub fn initialize_account(&mut self) {
260        self.generate_fresh_account_state();
261    }
262
263    /// Adds an instrument to the simulated exchange and initializes its matching engine.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the exchange account type is `Cash` and the instrument is a `CryptoPerpetual` or `CryptoFuture`.
268    ///
269    /// # Panics
270    ///
271    /// Panics if the instrument cannot be added to the exchange.
272    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
273        check_equal(
274            &instrument.id().venue,
275            &self.id,
276            "Venue of instrument id",
277            "Venue of simulated exchange",
278        )
279        .expect(FAILED);
280
281        if self.account_type == AccountType::Cash
282            && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
283                || matches!(instrument, InstrumentAny::CryptoFuture(_)))
284        {
285            anyhow::bail!("Cash account cannot trade futures or perpetuals")
286        }
287
288        self.instruments.insert(instrument.id(), instrument.clone());
289
290        let price_protection = if self.price_protection_points == 0 {
291            None
292        } else {
293            Some(self.price_protection_points)
294        };
295
296        let matching_engine_config = OrderMatchingEngineConfig::new(
297            self.bar_execution,
298            self.bar_adaptive_high_low_ordering,
299            self.trade_execution,
300            self.liquidity_consumption,
301            self.reject_stop_orders,
302            self.support_gtd_orders,
303            self.support_contingent_orders,
304            self.use_position_ids,
305            self.use_random_ids,
306            self.use_reduce_only,
307            self.use_market_order_acks,
308        )
309        .with_price_protection_points(price_protection);
310        let instrument_id = instrument.id();
311        let matching_engine = OrderMatchingEngine::new(
312            instrument,
313            self.instruments.len() as u32,
314            self.fill_model.clone(),
315            self.fee_model.clone(),
316            self.book_type,
317            self.oms_type,
318            self.account_type,
319            self.clock.clone(),
320            Rc::clone(&self.cache),
321            matching_engine_config,
322        );
323        self.matching_engines.insert(instrument_id, matching_engine);
324
325        log::info!("Added instrument {instrument_id} and created matching engine");
326        Ok(())
327    }
328
329    #[must_use]
330    pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
331        self.matching_engines
332            .get(&instrument_id)
333            .and_then(OrderMatchingEngine::best_bid_price)
334    }
335
336    #[must_use]
337    pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
338        self.matching_engines
339            .get(&instrument_id)
340            .and_then(OrderMatchingEngine::best_ask_price)
341    }
342
343    pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
344        self.matching_engines
345            .get(&instrument_id)
346            .map(OrderMatchingEngine::get_book)
347    }
348
349    #[must_use]
350    pub fn get_matching_engine(
351        &self,
352        instrument_id: &InstrumentId,
353    ) -> Option<&OrderMatchingEngine> {
354        self.matching_engines.get(instrument_id)
355    }
356
357    #[must_use]
358    pub const fn get_matching_engines(&self) -> &AHashMap<InstrumentId, OrderMatchingEngine> {
359        &self.matching_engines
360    }
361
362    #[must_use]
363    pub fn get_books(&self) -> AHashMap<InstrumentId, OrderBook> {
364        let mut books = AHashMap::new();
365        for (instrument_id, matching_engine) in &self.matching_engines {
366            books.insert(*instrument_id, matching_engine.get_book().clone());
367        }
368        books
369    }
370
371    #[must_use]
372    pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
373        instrument_id
374            .and_then(|id| {
375                self.matching_engines
376                    .get(&id)
377                    .map(OrderMatchingEngine::get_open_orders)
378            })
379            .unwrap_or_else(|| {
380                self.matching_engines
381                    .values()
382                    .flat_map(OrderMatchingEngine::get_open_orders)
383                    .collect()
384            })
385    }
386
387    #[must_use]
388    pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
389        instrument_id
390            .and_then(|id| {
391                self.matching_engines
392                    .get(&id)
393                    .map(|engine| engine.get_open_bid_orders().to_vec())
394            })
395            .unwrap_or_else(|| {
396                self.matching_engines
397                    .values()
398                    .flat_map(|engine| engine.get_open_bid_orders().to_vec())
399                    .collect()
400            })
401    }
402
403    #[must_use]
404    pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
405        instrument_id
406            .and_then(|id| {
407                self.matching_engines
408                    .get(&id)
409                    .map(|engine| engine.get_open_ask_orders().to_vec())
410            })
411            .unwrap_or_else(|| {
412                self.matching_engines
413                    .values()
414                    .flat_map(|engine| engine.get_open_ask_orders().to_vec())
415                    .collect()
416            })
417    }
418
419    /// # Panics
420    ///
421    /// Panics if retrieving the account from the execution client fails.
422    #[must_use]
423    pub fn get_account(&self) -> Option<AccountAny> {
424        self.exec_client
425            .as_ref()
426            .map(|client| client.get_account().unwrap())
427    }
428
429    /// Returns a reference to the cache.
430    #[must_use]
431    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
432        &self.cache
433    }
434
435    /// # Panics
436    ///
437    /// Panics if generating account state fails during adjustment.
438    pub fn adjust_account(&mut self, adjustment: Money) {
439        if self.frozen_account {
440            // Nothing to adjust
441            return;
442        }
443
444        if let Some(exec_client) = &self.exec_client {
445            let venue = exec_client.venue();
446            println!("Adjusting account for venue {venue}");
447            if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
448                match account.balance(Some(adjustment.currency)) {
449                    Some(balance) => {
450                        let mut current_balance = *balance;
451                        current_balance.total = current_balance.total + adjustment;
452                        current_balance.free = current_balance.free + adjustment;
453
454                        let margins = match account {
455                            AccountAny::Margin(margin_account) => margin_account.margins.clone(),
456                            _ => AHashMap::new(),
457                        };
458
459                        if let Some(exec_client) = &self.exec_client {
460                            exec_client
461                                .generate_account_state(
462                                    vec![current_balance],
463                                    margins.values().copied().collect(),
464                                    true,
465                                    self.clock.borrow().timestamp_ns(),
466                                )
467                                .unwrap();
468                        }
469                    }
470                    None => {
471                        log::error!(
472                            "Cannot adjust account: no balance for currency {}",
473                            adjustment.currency
474                        );
475                    }
476                }
477            } else {
478                log::error!("Cannot adjust account: no account for venue {venue}");
479            }
480        }
481    }
482
483    #[must_use]
484    pub fn has_pending_commands(&self, ts_now: UnixNanos) -> bool {
485        if !self.message_queue.is_empty() {
486            return true;
487        }
488        self.inflight_queue
489            .peek()
490            .is_some_and(|inflight| inflight.timestamp <= ts_now)
491    }
492
493    pub fn send(&mut self, command: TradingCommand) {
494        if !self.use_message_queue {
495            self.process_trading_command(command);
496        } else if self.latency_model.is_none() {
497            self.message_queue.push_back(command);
498        } else {
499            let (timestamp, counter) = self.generate_inflight_command(&command);
500            self.inflight_queue
501                .push(InflightCommand::new(timestamp, counter, command));
502        }
503    }
504
505    /// # Panics
506    ///
507    /// Panics if the command is invalid when generating inflight command.
508    pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
509        if let Some(latency_model) = &self.latency_model {
510            let ts = match command {
511                TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
512                    command.ts_init() + latency_model.get_insert_latency()
513                }
514                TradingCommand::ModifyOrder(_) => {
515                    command.ts_init() + latency_model.get_update_latency()
516                }
517                TradingCommand::CancelOrder(_)
518                | TradingCommand::CancelAllOrders(_)
519                | TradingCommand::BatchCancelOrders(_) => {
520                    command.ts_init() + latency_model.get_delete_latency()
521                }
522                _ => panic!("Cannot handle command: {command:?}"),
523            };
524
525            let counter = self
526                .inflight_counter
527                .entry(ts)
528                .and_modify(|e| *e += 1)
529                .or_insert(1);
530
531            (ts, *counter)
532        } else {
533            panic!("Latency model should be initialized");
534        }
535    }
536
537    /// # Panics
538    ///
539    /// Panics if adding a missing instrument during delta processing fails.
540    pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
541        for module in &self.modules {
542            module.pre_process(Data::Delta(delta));
543        }
544
545        if !self.matching_engines.contains_key(&delta.instrument_id) {
546            let instrument = {
547                let cache = self.cache.as_ref().borrow();
548                cache.instrument(&delta.instrument_id).cloned()
549            };
550
551            if let Some(instrument) = instrument {
552                self.add_instrument(instrument).unwrap();
553            } else {
554                panic!(
555                    "No matching engine found for instrument {}",
556                    delta.instrument_id
557                );
558            }
559        }
560
561        if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
562            matching_engine.process_order_book_delta(&delta).unwrap();
563        } else {
564            panic!("Matching engine should be initialized");
565        }
566    }
567
568    /// # Panics
569    ///
570    /// Panics if adding a missing instrument during deltas processing fails.
571    pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
572        for module in &self.modules {
573            module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
574        }
575
576        if !self.matching_engines.contains_key(&deltas.instrument_id) {
577            let instrument = {
578                let cache = self.cache.as_ref().borrow();
579                cache.instrument(&deltas.instrument_id).cloned()
580            };
581
582            if let Some(instrument) = instrument {
583                self.add_instrument(instrument).unwrap();
584            } else {
585                panic!(
586                    "No matching engine found for instrument {}",
587                    deltas.instrument_id
588                );
589            }
590        }
591
592        if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
593            matching_engine.process_order_book_deltas(&deltas).unwrap();
594        } else {
595            panic!("Matching engine should be initialized");
596        }
597    }
598
599    /// # Panics
600    ///
601    /// Panics if adding a missing instrument during depth10 processing fails.
602    pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) {
603        for module in &self.modules {
604            module.pre_process(Data::Depth10(Box::new(*depth)));
605        }
606
607        if !self.matching_engines.contains_key(&depth.instrument_id) {
608            let instrument = {
609                let cache = self.cache.as_ref().borrow();
610                cache.instrument(&depth.instrument_id).cloned()
611            };
612
613            if let Some(instrument) = instrument {
614                self.add_instrument(instrument).unwrap();
615            } else {
616                panic!(
617                    "No matching engine found for instrument {}",
618                    depth.instrument_id
619                );
620            }
621        }
622
623        if let Some(matching_engine) = self.matching_engines.get_mut(&depth.instrument_id) {
624            matching_engine.process_order_book_depth10(depth).unwrap();
625        } else {
626            panic!("Matching engine should be initialized");
627        }
628    }
629
630    /// # Panics
631    ///
632    /// Panics if adding a missing instrument during quote tick processing fails.
633    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
634        for module in &self.modules {
635            module.pre_process(Data::Quote(quote.to_owned()));
636        }
637
638        if !self.matching_engines.contains_key(&quote.instrument_id) {
639            let instrument = {
640                let cache = self.cache.as_ref().borrow();
641                cache.instrument(&quote.instrument_id).cloned()
642            };
643
644            if let Some(instrument) = instrument {
645                self.add_instrument(instrument).unwrap();
646            } else {
647                panic!(
648                    "No matching engine found for instrument {}",
649                    quote.instrument_id
650                );
651            }
652        }
653
654        if let Some(matching_engine) = self.matching_engines.get_mut(&quote.instrument_id) {
655            matching_engine.process_quote_tick(quote);
656        } else {
657            panic!("Matching engine should be initialized");
658        }
659    }
660
661    /// # Panics
662    ///
663    /// Panics if adding a missing instrument during trade tick processing fails.
664    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
665        for module in &self.modules {
666            module.pre_process(Data::Trade(trade.to_owned()));
667        }
668
669        if !self.matching_engines.contains_key(&trade.instrument_id) {
670            let instrument = {
671                let cache = self.cache.as_ref().borrow();
672                cache.instrument(&trade.instrument_id).cloned()
673            };
674
675            if let Some(instrument) = instrument {
676                self.add_instrument(instrument).unwrap();
677            } else {
678                panic!(
679                    "No matching engine found for instrument {}",
680                    trade.instrument_id
681                );
682            }
683        }
684
685        if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
686            matching_engine.process_trade_tick(trade);
687        } else {
688            panic!("Matching engine should be initialized");
689        }
690    }
691
692    /// # Panics
693    ///
694    /// Panics if adding a missing instrument during bar processing fails.
695    pub fn process_bar(&mut self, bar: Bar) {
696        for module in &self.modules {
697            module.pre_process(Data::Bar(bar));
698        }
699
700        if !self.matching_engines.contains_key(&bar.instrument_id()) {
701            let instrument = {
702                let cache = self.cache.as_ref().borrow();
703                cache.instrument(&bar.instrument_id()).cloned()
704            };
705
706            if let Some(instrument) = instrument {
707                self.add_instrument(instrument).unwrap();
708            } else {
709                panic!(
710                    "No matching engine found for instrument {}",
711                    bar.instrument_id()
712                );
713            }
714        }
715
716        if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
717            matching_engine.process_bar(&bar);
718        } else {
719            panic!("Matching engine should be initialized");
720        }
721    }
722
723    /// # Panics
724    ///
725    /// Panics if adding a missing instrument during instrument status processing fails.
726    pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
727        // TODO add module preprocessing
728
729        if !self.matching_engines.contains_key(&status.instrument_id) {
730            let instrument = {
731                let cache = self.cache.as_ref().borrow();
732                cache.instrument(&status.instrument_id).cloned()
733            };
734
735            if let Some(instrument) = instrument {
736                self.add_instrument(instrument).unwrap();
737            } else {
738                panic!(
739                    "No matching engine found for instrument {}",
740                    status.instrument_id
741                );
742            }
743        }
744
745        if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
746            matching_engine.process_status(status.action);
747        } else {
748            panic!("Matching engine should be initialized");
749        }
750    }
751
752    /// # Panics
753    ///
754    /// Panics if adding a missing instrument during instrument close processing fails.
755    pub fn process_instrument_close(&mut self, close: InstrumentClose) {
756        for module in &self.modules {
757            module.pre_process(Data::InstrumentClose(close));
758        }
759
760        if !self.matching_engines.contains_key(&close.instrument_id) {
761            let instrument = {
762                let cache = self.cache.as_ref().borrow();
763                cache.instrument(&close.instrument_id).cloned()
764            };
765
766            if let Some(instrument) = instrument {
767                self.add_instrument(instrument).unwrap();
768            } else {
769                panic!(
770                    "No matching engine found for instrument {}",
771                    close.instrument_id
772                );
773            }
774        }
775
776        if let Some(matching_engine) = self.matching_engines.get_mut(&close.instrument_id) {
777            matching_engine.process_instrument_close(close);
778        } else {
779            panic!("Matching engine should be initialized");
780        }
781    }
782
783    /// # Panics
784    ///
785    /// Panics if popping an inflight command fails during processing.
786    pub fn process(&mut self, ts_now: UnixNanos) {
787        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
788
789        // Process inflight commands
790        while let Some(inflight) = self.inflight_queue.peek() {
791            if inflight.timestamp > ts_now {
792                // Future commands remain in the queue
793                break;
794            }
795            // We get the inflight command, remove it from the queue and process it
796            let inflight = self.inflight_queue.pop().unwrap();
797            self.process_trading_command(inflight.command);
798        }
799
800        // Process regular message queue
801        while let Some(command) = self.message_queue.pop_front() {
802            self.process_trading_command(command);
803        }
804    }
805
806    pub fn reset(&mut self) {
807        for module in &self.modules {
808            module.reset();
809        }
810
811        self.generate_fresh_account_state();
812
813        for matching_engine in self.matching_engines.values_mut() {
814            matching_engine.reset();
815        }
816
817        self.message_queue.clear();
818        self.inflight_queue.clear();
819
820        log::info!("Resetting exchange state");
821    }
822
823    /// # Panics
824    ///
825    /// Panics if execution client is uninitialized when processing trading command.
826    pub fn process_trading_command(&mut self, command: TradingCommand) {
827        if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
828            let account_id = if let Some(exec_client) = &self.exec_client {
829                exec_client.account_id()
830            } else {
831                panic!("Execution client should be initialized");
832            };
833            match command {
834                TradingCommand::SubmitOrder(command) => {
835                    let mut order = self
836                        .cache
837                        .borrow()
838                        .order(&command.client_order_id)
839                        .cloned()
840                        .expect("Order must exist in cache");
841                    matching_engine.process_order(&mut order, account_id);
842                }
843                TradingCommand::ModifyOrder(ref command) => {
844                    matching_engine.process_modify(command, account_id);
845                }
846                TradingCommand::CancelOrder(ref command) => {
847                    matching_engine.process_cancel(command, account_id);
848                }
849                TradingCommand::CancelAllOrders(ref command) => {
850                    matching_engine.process_cancel_all(command, account_id);
851                }
852                TradingCommand::BatchCancelOrders(ref command) => {
853                    matching_engine.process_batch_cancel(command, account_id);
854                }
855                TradingCommand::SubmitOrderList(ref command) => {
856                    let mut orders: Vec<OrderAny> = self
857                        .cache
858                        .borrow()
859                        .orders_for_ids(&command.order_list.client_order_ids, command);
860
861                    for order in &mut orders {
862                        matching_engine.process_order(order, account_id);
863                    }
864                }
865                _ => {}
866            }
867        } else {
868            panic!(
869                "Matching engine not found for instrument {}",
870                command.instrument_id()
871            );
872        }
873    }
874
875    /// # Panics
876    ///
877    /// Panics if generating fresh account state fails.
878    pub fn generate_fresh_account_state(&self) {
879        let balances: Vec<AccountBalance> = self
880            .starting_balances
881            .iter()
882            .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
883            .collect();
884
885        if let Some(exec_client) = &self.exec_client {
886            exec_client
887                .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
888                .unwrap();
889        }
890
891        // Set leverages
892        if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
893            margin_account.set_default_leverage(self.default_leverage);
894
895            // Set instrument specific leverages
896            for (instrument_id, leverage) in &self.leverages {
897                margin_account.set_leverage(*instrument_id, *leverage);
898            }
899        }
900    }
901}
902
903#[cfg(test)]
904mod tests {
905    use std::{cell::RefCell, collections::BinaryHeap, rc::Rc};
906
907    use ahash::AHashMap;
908    use nautilus_common::{
909        cache::Cache,
910        clock::TestClock,
911        messages::execution::{SubmitOrder, TradingCommand},
912        msgbus::{self, stubs::get_typed_message_saving_handler},
913    };
914    use nautilus_core::{UUID4, UnixNanos};
915    use nautilus_execution::models::{
916        fee::{FeeModelAny, MakerTakerFeeModel},
917        fill::FillModelAny,
918        latency::StaticLatencyModel,
919    };
920    use nautilus_model::{
921        accounts::{AccountAny, MarginAccount},
922        data::{
923            Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
924            TradeTick,
925        },
926        enums::{
927            AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
928            OmsType, OrderSide, OrderType,
929        },
930        events::AccountState,
931        identifiers::{
932            AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
933        },
934        instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
935        orders::{Order, OrderAny, OrderTestBuilder},
936        stubs::TestDefault,
937        types::{AccountBalance, Currency, Money, Price, Quantity},
938    };
939    use rstest::rstest;
940
941    use crate::{
942        exchange::{InflightCommand, SimulatedExchange},
943        execution_client::BacktestExecutionClient,
944    };
945
946    fn get_exchange(
947        venue: Venue,
948        account_type: AccountType,
949        book_type: BookType,
950        cache: Option<Rc<RefCell<Cache>>>,
951    ) -> Rc<RefCell<SimulatedExchange>> {
952        let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
953        let clock = Rc::new(RefCell::new(TestClock::new()));
954        let exchange = Rc::new(RefCell::new(
955            SimulatedExchange::new(
956                venue,
957                OmsType::Netting,
958                account_type,
959                vec![Money::new(1000.0, Currency::USD())],
960                None,
961                1.into(),
962                AHashMap::new(),
963                vec![],
964                cache.clone(),
965                clock,
966                FillModelAny::default(),
967                FeeModelAny::MakerTaker(MakerTakerFeeModel),
968                book_type,
969                None, // latency_model
970                None, // bar_execution
971                None, // bar_adaptive_high_low_ordering
972                None, // trade_execution
973                None, // liquidity_consumption
974                None, // reject_stop_orders
975                None, // support_gtd_orders
976                None, // support_contingent_orders
977                None, // use_position_ids
978                None, // use_random_ids
979                None, // use_reduce_only
980                None, // use_message_queue
981                None, // use_market_order_acks
982                None, // allow_cash_borrowing
983                None, // frozen_account
984                None, // price_protection_points
985            )
986            .unwrap(),
987        ));
988
989        let clock = TestClock::new();
990        let execution_client = BacktestExecutionClient::new(
991            TraderId::test_default(),
992            AccountId::test_default(),
993            exchange.clone(),
994            cache,
995            Rc::new(RefCell::new(clock)),
996            None,
997            None,
998        );
999        exchange
1000            .borrow_mut()
1001            .register_client(Rc::new(execution_client));
1002
1003        exchange
1004    }
1005
1006    fn create_submit_order_command(
1007        ts_init: UnixNanos,
1008        client_order_id: &str,
1009    ) -> (OrderAny, TradingCommand) {
1010        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
1011        let order = OrderTestBuilder::new(OrderType::Market)
1012            .instrument_id(instrument_id)
1013            .client_order_id(ClientOrderId::new(client_order_id))
1014            .quantity(Quantity::from(1))
1015            .build();
1016        let command = TradingCommand::SubmitOrder(SubmitOrder::new(
1017            TraderId::test_default(),
1018            None,
1019            StrategyId::test_default(),
1020            instrument_id,
1021            order.client_order_id(),
1022            order.init_event().clone(),
1023            None,
1024            None,
1025            None, // params
1026            UUID4::default(),
1027            ts_init,
1028        ));
1029        (order, command)
1030    }
1031
1032    #[rstest]
1033    #[should_panic(
1034        expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
1035    )]
1036    fn test_venue_mismatch_between_exchange_and_instrument(
1037        crypto_perpetual_ethusdt: CryptoPerpetual,
1038    ) {
1039        let exchange = get_exchange(
1040            Venue::new("SIM"),
1041            AccountType::Margin,
1042            BookType::L1_MBP,
1043            None,
1044        );
1045        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1046        exchange.borrow_mut().add_instrument(instrument).unwrap();
1047    }
1048
1049    #[rstest]
1050    #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
1051    fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
1052        let exchange = get_exchange(
1053            Venue::new("BINANCE"),
1054            AccountType::Cash,
1055            BookType::L1_MBP,
1056            None,
1057        );
1058        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1059        exchange.borrow_mut().add_instrument(instrument).unwrap();
1060    }
1061
1062    #[rstest]
1063    fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
1064        let exchange = get_exchange(
1065            Venue::new("BINANCE"),
1066            AccountType::Margin,
1067            BookType::L1_MBP,
1068            None,
1069        );
1070        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1071
1072        // register instrument
1073        exchange.borrow_mut().add_instrument(instrument).unwrap();
1074
1075        // process tick
1076        let quote_tick = QuoteTick::new(
1077            crypto_perpetual_ethusdt.id,
1078            Price::from("1000.00"),
1079            Price::from("1001.00"),
1080            Quantity::from("1.000"),
1081            Quantity::from("1.000"),
1082            UnixNanos::default(),
1083            UnixNanos::default(),
1084        );
1085        exchange.borrow_mut().process_quote_tick(&quote_tick);
1086
1087        let best_bid_price = exchange
1088            .borrow()
1089            .best_bid_price(crypto_perpetual_ethusdt.id);
1090        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1091        let best_ask_price = exchange
1092            .borrow()
1093            .best_ask_price(crypto_perpetual_ethusdt.id);
1094        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1095    }
1096
1097    #[rstest]
1098    fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
1099        let exchange = get_exchange(
1100            Venue::new("BINANCE"),
1101            AccountType::Margin,
1102            BookType::L1_MBP,
1103            None,
1104        );
1105        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1106
1107        // register instrument
1108        exchange.borrow_mut().add_instrument(instrument).unwrap();
1109
1110        // process tick
1111        let trade_tick = TradeTick::new(
1112            crypto_perpetual_ethusdt.id,
1113            Price::from("1000.00"),
1114            Quantity::from("1.000"),
1115            AggressorSide::Buyer,
1116            TradeId::from("1"),
1117            UnixNanos::default(),
1118            UnixNanos::default(),
1119        );
1120        exchange.borrow_mut().process_trade_tick(&trade_tick);
1121
1122        let best_bid_price = exchange
1123            .borrow()
1124            .best_bid_price(crypto_perpetual_ethusdt.id);
1125        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1126        let best_ask = exchange
1127            .borrow()
1128            .best_ask_price(crypto_perpetual_ethusdt.id);
1129        assert_eq!(best_ask, Some(Price::from("1000.00")));
1130    }
1131
1132    #[rstest]
1133    fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1134        let exchange = get_exchange(
1135            Venue::new("BINANCE"),
1136            AccountType::Margin,
1137            BookType::L1_MBP,
1138            None,
1139        );
1140        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1141
1142        // register instrument
1143        exchange.borrow_mut().add_instrument(instrument).unwrap();
1144
1145        // process bar
1146        let bar = Bar::new(
1147            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1148            Price::from("1500.00"),
1149            Price::from("1505.00"),
1150            Price::from("1490.00"),
1151            Price::from("1502.00"),
1152            Quantity::from("100.000"),
1153            UnixNanos::default(),
1154            UnixNanos::default(),
1155        );
1156        exchange.borrow_mut().process_bar(bar);
1157
1158        // this will be processed as ticks so both bid and ask will be the same as close of the bar
1159        let best_bid_price = exchange
1160            .borrow()
1161            .best_bid_price(crypto_perpetual_ethusdt.id);
1162        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1163        let best_ask_price = exchange
1164            .borrow()
1165            .best_ask_price(crypto_perpetual_ethusdt.id);
1166        assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1167    }
1168
1169    #[rstest]
1170    fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1171        let exchange = get_exchange(
1172            Venue::new("BINANCE"),
1173            AccountType::Margin,
1174            BookType::L1_MBP,
1175            None,
1176        );
1177        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1178
1179        // register instrument
1180        exchange.borrow_mut().add_instrument(instrument).unwrap();
1181
1182        // create both bid and ask based bars
1183        // add +1 on ask to make sure it is different from bid
1184        let bar_bid = Bar::new(
1185            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1186            Price::from("1500.00"),
1187            Price::from("1505.00"),
1188            Price::from("1490.00"),
1189            Price::from("1502.00"),
1190            Quantity::from("100.000"),
1191            UnixNanos::from(1),
1192            UnixNanos::from(1),
1193        );
1194        let bar_ask = Bar::new(
1195            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1196            Price::from("1501.00"),
1197            Price::from("1506.00"),
1198            Price::from("1491.00"),
1199            Price::from("1503.00"),
1200            Quantity::from("100.000"),
1201            UnixNanos::from(1),
1202            UnixNanos::from(1),
1203        );
1204
1205        // process them
1206        exchange.borrow_mut().process_bar(bar_bid);
1207        exchange.borrow_mut().process_bar(bar_ask);
1208
1209        // current bid and ask prices will be the corresponding close of the ask and bid bar
1210        let best_bid_price = exchange
1211            .borrow()
1212            .best_bid_price(crypto_perpetual_ethusdt.id);
1213        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1214        let best_ask_price = exchange
1215            .borrow()
1216            .best_ask_price(crypto_perpetual_ethusdt.id);
1217        assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1218    }
1219
1220    #[rstest]
1221    fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1222        let exchange = get_exchange(
1223            Venue::new("BINANCE"),
1224            AccountType::Margin,
1225            BookType::L2_MBP,
1226            None,
1227        );
1228        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1229
1230        // register instrument
1231        exchange.borrow_mut().add_instrument(instrument).unwrap();
1232
1233        // create order book delta at both bid and ask with incremented ts init and sequence
1234        let delta_buy = OrderBookDelta::new(
1235            crypto_perpetual_ethusdt.id,
1236            BookAction::Add,
1237            BookOrder::new(
1238                OrderSide::Buy,
1239                Price::from("1000.00"),
1240                Quantity::from("1.000"),
1241                1,
1242            ),
1243            0,
1244            0,
1245            UnixNanos::from(1),
1246            UnixNanos::from(1),
1247        );
1248        let delta_sell = OrderBookDelta::new(
1249            crypto_perpetual_ethusdt.id,
1250            BookAction::Add,
1251            BookOrder::new(
1252                OrderSide::Sell,
1253                Price::from("1001.00"),
1254                Quantity::from("1.000"),
1255                1,
1256            ),
1257            0,
1258            1,
1259            UnixNanos::from(2),
1260            UnixNanos::from(2),
1261        );
1262
1263        // process both deltas
1264        exchange.borrow_mut().process_order_book_delta(delta_buy);
1265        exchange.borrow_mut().process_order_book_delta(delta_sell);
1266
1267        let book = exchange
1268            .borrow()
1269            .get_book(crypto_perpetual_ethusdt.id)
1270            .unwrap()
1271            .clone();
1272        assert_eq!(book.update_count, 2);
1273        assert_eq!(book.sequence, 1);
1274        assert_eq!(book.ts_last, UnixNanos::from(2));
1275        let best_bid_price = exchange
1276            .borrow()
1277            .best_bid_price(crypto_perpetual_ethusdt.id);
1278        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1279        let best_ask_price = exchange
1280            .borrow()
1281            .best_ask_price(crypto_perpetual_ethusdt.id);
1282        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1283    }
1284
1285    #[rstest]
1286    fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1287        let exchange = get_exchange(
1288            Venue::new("BINANCE"),
1289            AccountType::Margin,
1290            BookType::L2_MBP,
1291            None,
1292        );
1293        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1294
1295        // register instrument
1296        exchange.borrow_mut().add_instrument(instrument).unwrap();
1297
1298        // create two sell order book deltas with same timestamps and higher sequence
1299        let delta_sell_1 = OrderBookDelta::new(
1300            crypto_perpetual_ethusdt.id,
1301            BookAction::Add,
1302            BookOrder::new(
1303                OrderSide::Sell,
1304                Price::from("1000.00"),
1305                Quantity::from("3.000"),
1306                1,
1307            ),
1308            0,
1309            0,
1310            UnixNanos::from(1),
1311            UnixNanos::from(1),
1312        );
1313        let delta_sell_2 = OrderBookDelta::new(
1314            crypto_perpetual_ethusdt.id,
1315            BookAction::Add,
1316            BookOrder::new(
1317                OrderSide::Sell,
1318                Price::from("1001.00"),
1319                Quantity::from("1.000"),
1320                1,
1321            ),
1322            0,
1323            1,
1324            UnixNanos::from(1),
1325            UnixNanos::from(1),
1326        );
1327        let orderbook_deltas = OrderBookDeltas::new(
1328            crypto_perpetual_ethusdt.id,
1329            vec![delta_sell_1, delta_sell_2],
1330        );
1331
1332        // process both deltas
1333        exchange
1334            .borrow_mut()
1335            .process_order_book_deltas(orderbook_deltas);
1336
1337        let book = exchange
1338            .borrow()
1339            .get_book(crypto_perpetual_ethusdt.id)
1340            .unwrap()
1341            .clone();
1342        assert_eq!(book.update_count, 2);
1343        assert_eq!(book.sequence, 1);
1344        assert_eq!(book.ts_last, UnixNanos::from(1));
1345        let best_bid_price = exchange
1346            .borrow()
1347            .best_bid_price(crypto_perpetual_ethusdt.id);
1348        // no bid orders in orderbook deltas
1349        assert_eq!(best_bid_price, None);
1350        let best_ask_price = exchange
1351            .borrow()
1352            .best_ask_price(crypto_perpetual_ethusdt.id);
1353        // best ask price is the first order in orderbook deltas
1354        assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1355    }
1356
1357    #[rstest]
1358    fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1359        let exchange = get_exchange(
1360            Venue::new("BINANCE"),
1361            AccountType::Margin,
1362            BookType::L2_MBP,
1363            None,
1364        );
1365        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1366
1367        // register instrument
1368        exchange.borrow_mut().add_instrument(instrument).unwrap();
1369
1370        let instrument_status = InstrumentStatus::new(
1371            crypto_perpetual_ethusdt.id,
1372            MarketStatusAction::Close, // close the market
1373            UnixNanos::from(1),
1374            UnixNanos::from(1),
1375            None,
1376            None,
1377            None,
1378            None,
1379            None,
1380        );
1381
1382        exchange
1383            .borrow_mut()
1384            .process_instrument_status(instrument_status);
1385
1386        let market_status = exchange
1387            .borrow()
1388            .get_matching_engine(&crypto_perpetual_ethusdt.id)
1389            .unwrap()
1390            .market_status;
1391        assert_eq!(market_status, MarketStatus::Closed);
1392    }
1393
1394    #[rstest]
1395    fn test_accounting() {
1396        let account_type = AccountType::Margin;
1397        let mut cache = Cache::default();
1398        let (handler, saving_handler) = get_typed_message_saving_handler::<AccountState>(None);
1399        msgbus::register_account_state_endpoint("Portfolio.update_account".into(), handler);
1400        let margin_account = MarginAccount::new(
1401            AccountState::new(
1402                AccountId::from("SIM-001"),
1403                account_type,
1404                vec![AccountBalance::new(
1405                    Money::from("1000 USD"),
1406                    Money::from("0 USD"),
1407                    Money::from("1000 USD"),
1408                )],
1409                vec![],
1410                false,
1411                UUID4::default(),
1412                UnixNanos::default(),
1413                UnixNanos::default(),
1414                None,
1415            ),
1416            false,
1417        );
1418        let () = cache
1419            .add_account(AccountAny::Margin(margin_account))
1420            .unwrap();
1421        // build indexes
1422        cache.build_index();
1423
1424        let exchange = get_exchange(
1425            Venue::new("SIM"),
1426            account_type,
1427            BookType::L2_MBP,
1428            Some(Rc::new(RefCell::new(cache))),
1429        );
1430        exchange.borrow_mut().initialize_account();
1431
1432        // Test adjust account, increase balance by 500 USD
1433        exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1434
1435        // Check if we received two messages, one for initial account state and one for adjusted account state
1436        let messages = saving_handler.get_messages();
1437        assert_eq!(messages.len(), 2);
1438        let account_state_first = messages.first().unwrap();
1439        let account_state_second = messages.last().unwrap();
1440
1441        assert_eq!(account_state_first.balances.len(), 1);
1442        let current_balance = account_state_first.balances[0];
1443        assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1444        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1445        assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1446
1447        assert_eq!(account_state_second.balances.len(), 1);
1448        let current_balance = account_state_second.balances[0];
1449        assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1450        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1451        assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1452    }
1453
1454    #[rstest]
1455    fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1456        // Create 3 inflight commands with different timestamps and counters
1457        let (_, cmd1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1458        let (_, cmd2) = create_submit_order_command(UnixNanos::from(200), "O-2");
1459        let (_, cmd3) = create_submit_order_command(UnixNanos::from(100), "O-3");
1460
1461        let inflight1 = InflightCommand::new(UnixNanos::from(100), 1, cmd1);
1462        let inflight2 = InflightCommand::new(UnixNanos::from(200), 2, cmd2);
1463        let inflight3 = InflightCommand::new(UnixNanos::from(100), 2, cmd3);
1464
1465        // Create a binary heap and push the inflight commands
1466        let mut inflight_heap = BinaryHeap::new();
1467        inflight_heap.push(inflight1);
1468        inflight_heap.push(inflight2);
1469        inflight_heap.push(inflight3);
1470
1471        // Pop the inflight commands and check if they are in the correct order
1472        // by our custom ordering with counter and timestamp
1473        let first = inflight_heap.pop().unwrap();
1474        let second = inflight_heap.pop().unwrap();
1475        let third = inflight_heap.pop().unwrap();
1476
1477        assert_eq!(first.timestamp, UnixNanos::from(100));
1478        assert_eq!(first.counter, 1);
1479        assert_eq!(second.timestamp, UnixNanos::from(100));
1480        assert_eq!(second.counter, 2);
1481        assert_eq!(third.timestamp, UnixNanos::from(200));
1482        assert_eq!(third.counter, 2);
1483    }
1484
1485    #[rstest]
1486    fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1487        let exchange = get_exchange(
1488            Venue::new("BINANCE"),
1489            AccountType::Margin,
1490            BookType::L2_MBP,
1491            None,
1492        );
1493
1494        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1495        exchange.borrow_mut().add_instrument(instrument).unwrap();
1496
1497        let (order1, command1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1498        let (order2, command2) = create_submit_order_command(UnixNanos::from(200), "O-2");
1499
1500        exchange
1501            .borrow()
1502            .cache()
1503            .borrow_mut()
1504            .add_order(order1, None, None, false)
1505            .unwrap();
1506        exchange
1507            .borrow()
1508            .cache()
1509            .borrow_mut()
1510            .add_order(order2, None, None, false)
1511            .unwrap();
1512
1513        exchange.borrow_mut().send(command1);
1514        exchange.borrow_mut().send(command2);
1515
1516        // Verify that message queue has 2 commands and inflight queue is empty
1517        // as we are not using latency model
1518        assert_eq!(exchange.borrow().message_queue.len(), 2);
1519        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1520
1521        // Process command and check that queues is empty
1522        exchange.borrow_mut().process(UnixNanos::from(300));
1523        assert_eq!(exchange.borrow().message_queue.len(), 0);
1524        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1525    }
1526
1527    #[rstest]
1528    fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1529        // StaticLatencyModel adds base_latency to each operation latency
1530        // base=100, insert=200 -> effective insert latency = 300
1531        let latency_model = StaticLatencyModel::new(
1532            UnixNanos::from(100),
1533            UnixNanos::from(200),
1534            UnixNanos::from(300),
1535            UnixNanos::from(100),
1536        );
1537        let exchange = get_exchange(
1538            Venue::new("BINANCE"),
1539            AccountType::Margin,
1540            BookType::L2_MBP,
1541            None,
1542        );
1543        exchange
1544            .borrow_mut()
1545            .set_latency_model(Box::new(latency_model));
1546
1547        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1548        exchange.borrow_mut().add_instrument(instrument).unwrap();
1549
1550        let (order1, command1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1551        let (order2, command2) = create_submit_order_command(UnixNanos::from(150), "O-2");
1552
1553        exchange
1554            .borrow()
1555            .cache()
1556            .borrow_mut()
1557            .add_order(order1, None, None, false)
1558            .unwrap();
1559        exchange
1560            .borrow()
1561            .cache()
1562            .borrow_mut()
1563            .add_order(order2, None, None, false)
1564            .unwrap();
1565
1566        exchange.borrow_mut().send(command1);
1567        exchange.borrow_mut().send(command2);
1568
1569        // Verify that inflight queue has 2 commands and message queue is empty
1570        assert_eq!(exchange.borrow().message_queue.len(), 0);
1571        assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1572        // First inflight command: ts_init=100 + effective_insert_latency=300 = 400
1573        assert_eq!(
1574            exchange
1575                .borrow()
1576                .inflight_queue
1577                .iter()
1578                .next()
1579                .unwrap()
1580                .timestamp,
1581            UnixNanos::from(400)
1582        );
1583        // Second inflight command: ts_init=150 + effective_insert_latency=300 = 450
1584        assert_eq!(
1585            exchange
1586                .borrow()
1587                .inflight_queue
1588                .iter()
1589                .nth(1)
1590                .unwrap()
1591                .timestamp,
1592            UnixNanos::from(450)
1593        );
1594
1595        // Process at timestamp 420, and test that only first command is processed
1596        exchange.borrow_mut().process(UnixNanos::from(420));
1597        assert_eq!(exchange.borrow().message_queue.len(), 0);
1598        assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1599        assert_eq!(
1600            exchange
1601                .borrow()
1602                .inflight_queue
1603                .iter()
1604                .next()
1605                .unwrap()
1606                .timestamp,
1607            UnixNanos::from(450)
1608        );
1609    }
1610
1611    #[rstest]
1612    fn test_process_iterates_matching_engines_after_commands(
1613        crypto_perpetual_ethusdt: CryptoPerpetual,
1614    ) {
1615        let cache = Rc::new(RefCell::new(Cache::default()));
1616        let exchange = get_exchange(
1617            Venue::new("BINANCE"),
1618            AccountType::Margin,
1619            BookType::L1_MBP,
1620            Some(cache.clone()),
1621        );
1622        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1623        let instrument_id = crypto_perpetual_ethusdt.id;
1624        exchange.borrow_mut().add_instrument(instrument).unwrap();
1625
1626        let quote = QuoteTick::new(
1627            instrument_id,
1628            Price::from("1000.00"),
1629            Price::from("1001.00"),
1630            Quantity::from("1.000"),
1631            Quantity::from("1.000"),
1632            UnixNanos::from(1),
1633            UnixNanos::from(1),
1634        );
1635        exchange.borrow_mut().process_quote_tick(&quote);
1636
1637        // Create a passive buy limit below the ask (should NOT fill)
1638        let order = OrderTestBuilder::new(OrderType::Limit)
1639            .instrument_id(instrument_id)
1640            .client_order_id(ClientOrderId::new("O-LIMIT-1"))
1641            .side(OrderSide::Buy)
1642            .quantity(Quantity::from("1.000"))
1643            .price(Price::from("999.00"))
1644            .build();
1645
1646        cache
1647            .borrow_mut()
1648            .add_order(order.clone(), None, None, false)
1649            .unwrap();
1650
1651        let command = TradingCommand::SubmitOrder(SubmitOrder::new(
1652            TraderId::test_default(),
1653            None,
1654            StrategyId::test_default(),
1655            instrument_id,
1656            order.client_order_id(),
1657            order.init_event().clone(),
1658            None,
1659            None,
1660            None,
1661            UUID4::default(),
1662            UnixNanos::from(1),
1663        ));
1664        exchange.borrow_mut().send(command);
1665
1666        exchange.borrow_mut().process(UnixNanos::from(1));
1667
1668        let open_orders = exchange.borrow().get_open_orders(Some(instrument_id));
1669        assert_eq!(open_orders.len(), 1);
1670        assert_eq!(
1671            open_orders[0].client_order_id,
1672            ClientOrderId::new("O-LIMIT-1")
1673        );
1674    }
1675}