nautilus_backtest/
exchange.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `SimulatedExchange` venue for backtesting on historical data.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23    cell::RefCell,
24    collections::{BinaryHeap, HashMap, VecDeque},
25    rc::Rc,
26};
27
28use nautilus_common::{cache::Cache, clock::Clock};
29use nautilus_core::{
30    UnixNanos,
31    correctness::{FAILED, check_equal},
32};
33use nautilus_execution::{
34    client::ExecutionClient,
35    matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
36    messages::TradingCommand,
37    models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
38};
39use nautilus_model::{
40    accounts::AccountAny,
41    data::{
42        Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
43        QuoteTick, TradeTick,
44    },
45    enums::{AccountType, BookType, OmsType},
46    identifiers::{InstrumentId, Venue},
47    instruments::{Instrument, InstrumentAny},
48    orderbook::OrderBook,
49    orders::PassiveOrderAny,
50    types::{AccountBalance, Currency, Money, Price},
51};
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53
54use crate::modules::SimulationModule;
55
56/// Represents commands with simulated network latency in a min-heap priority queue.
57/// The commands are ordered by timestamp for FIFO processing, with the
58/// earliest timestamp having the highest priority in the queue.
59#[derive(Debug, Eq, PartialEq)]
60struct InflightCommand {
61    ts: UnixNanos,
62    counter: u32,
63    command: TradingCommand,
64}
65
66impl InflightCommand {
67    const fn new(ts: UnixNanos, counter: u32, command: TradingCommand) -> Self {
68        Self {
69            ts,
70            counter,
71            command,
72        }
73    }
74}
75
76impl Ord for InflightCommand {
77    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78        // Reverse ordering for min-heap (earliest timestamp first then lowest counter)
79        other
80            .ts
81            .cmp(&self.ts)
82            .then_with(|| other.counter.cmp(&self.counter))
83    }
84}
85
86impl PartialOrd for InflightCommand {
87    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92pub struct SimulatedExchange {
93    pub id: Venue,
94    pub oms_type: OmsType,
95    pub account_type: AccountType,
96    starting_balances: Vec<Money>,
97    book_type: BookType,
98    default_leverage: Decimal,
99    exec_client: Option<Rc<dyn ExecutionClient>>,
100    pub base_currency: Option<Currency>,
101    fee_model: FeeModelAny,
102    fill_model: FillModel,
103    latency_model: Option<LatencyModel>,
104    instruments: HashMap<InstrumentId, InstrumentAny>,
105    matching_engines: HashMap<InstrumentId, OrderMatchingEngine>,
106    leverages: HashMap<InstrumentId, Decimal>,
107    modules: Vec<Box<dyn SimulationModule>>,
108    clock: Rc<RefCell<dyn Clock>>,
109    cache: Rc<RefCell<Cache>>,
110    message_queue: VecDeque<TradingCommand>,
111    inflight_queue: BinaryHeap<InflightCommand>,
112    inflight_counter: HashMap<UnixNanos, u32>,
113    frozen_account: bool,
114    bar_execution: bool,
115    reject_stop_orders: bool,
116    support_gtd_orders: bool,
117    support_contingent_orders: bool,
118    use_position_ids: bool,
119    use_random_ids: bool,
120    use_reduce_only: bool,
121    use_message_queue: bool,
122}
123
124impl SimulatedExchange {
125    /// Creates a new [`SimulatedExchange`] instance.
126    #[allow(clippy::too_many_arguments)]
127    pub fn new(
128        venue: Venue,
129        oms_type: OmsType,
130        account_type: AccountType,
131        starting_balances: Vec<Money>,
132        base_currency: Option<Currency>,
133        default_leverage: Decimal,
134        leverages: HashMap<InstrumentId, Decimal>,
135        modules: Vec<Box<dyn SimulationModule>>,
136        cache: Rc<RefCell<Cache>>,
137        clock: Rc<RefCell<dyn Clock>>,
138        fill_model: FillModel,
139        fee_model: FeeModelAny,
140        book_type: BookType,
141        latency_model: Option<LatencyModel>,
142        frozen_account: Option<bool>,
143        bar_execution: Option<bool>,
144        reject_stop_orders: Option<bool>,
145        support_gtd_orders: Option<bool>,
146        support_contingent_orders: Option<bool>,
147        use_position_ids: Option<bool>,
148        use_random_ids: Option<bool>,
149        use_reduce_only: Option<bool>,
150        use_message_queue: Option<bool>,
151    ) -> anyhow::Result<Self> {
152        if starting_balances.is_empty() {
153            anyhow::bail!("Starting balances must be provided")
154        }
155        if base_currency.is_some() && starting_balances.len() > 1 {
156            anyhow::bail!("single-currency account has multiple starting currencies")
157        }
158        // TODO register and load modules
159        Ok(Self {
160            id: venue,
161            oms_type,
162            account_type,
163            starting_balances,
164            book_type,
165            default_leverage,
166            exec_client: None,
167            base_currency,
168            fee_model,
169            fill_model,
170            latency_model,
171            instruments: HashMap::new(),
172            matching_engines: HashMap::new(),
173            leverages,
174            modules,
175            clock,
176            cache,
177            message_queue: VecDeque::new(),
178            inflight_queue: BinaryHeap::new(),
179            inflight_counter: HashMap::new(),
180            frozen_account: frozen_account.unwrap_or(false),
181            bar_execution: bar_execution.unwrap_or(true),
182            reject_stop_orders: reject_stop_orders.unwrap_or(true),
183            support_gtd_orders: support_gtd_orders.unwrap_or(true),
184            support_contingent_orders: support_contingent_orders.unwrap_or(true),
185            use_position_ids: use_position_ids.unwrap_or(true),
186            use_random_ids: use_random_ids.unwrap_or(false),
187            use_reduce_only: use_reduce_only.unwrap_or(true),
188            use_message_queue: use_message_queue.unwrap_or(true),
189        })
190    }
191
192    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
193        self.exec_client = Some(client);
194    }
195
196    pub fn set_fill_model(&mut self, fill_model: FillModel) {
197        for matching_engine in self.matching_engines.values_mut() {
198            matching_engine.set_fill_model(fill_model.clone());
199            log::info!(
200                "Setting fill model for {} to {}",
201                matching_engine.venue,
202                self.fill_model
203            );
204        }
205        self.fill_model = fill_model;
206    }
207
208    pub const fn set_latency_model(&mut self, latency_model: LatencyModel) {
209        self.latency_model = Some(latency_model);
210    }
211
212    pub fn initialize_account(&mut self) {
213        self.generate_fresh_account_state();
214    }
215
216    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
217        check_equal(
218            instrument.id().venue,
219            self.id,
220            "Venue of instrument id",
221            "Venue of simulated exchange",
222        )
223        .expect(FAILED);
224
225        if self.account_type == AccountType::Cash
226            && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
227                || matches!(instrument, InstrumentAny::CryptoFuture(_)))
228        {
229            anyhow::bail!("Cash account cannot trade futures or perpetuals")
230        }
231
232        self.instruments.insert(instrument.id(), instrument.clone());
233
234        let matching_engine_config = OrderMatchingEngineConfig::new(
235            self.bar_execution,
236            self.reject_stop_orders,
237            self.support_gtd_orders,
238            self.support_contingent_orders,
239            self.use_position_ids,
240            self.use_random_ids,
241            self.use_reduce_only,
242        );
243        let instrument_id = instrument.id();
244        let matching_engine = OrderMatchingEngine::new(
245            instrument,
246            self.instruments.len() as u32,
247            self.fill_model.clone(),
248            self.fee_model.clone(),
249            self.book_type,
250            self.oms_type,
251            self.account_type,
252            self.clock.clone(),
253            Rc::clone(&self.cache),
254            matching_engine_config,
255        );
256        self.matching_engines.insert(instrument_id, matching_engine);
257
258        log::info!("Added instrument {instrument_id} and created matching engine");
259        Ok(())
260    }
261
262    #[must_use]
263    pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
264        self.matching_engines
265            .get(&instrument_id)
266            .and_then(OrderMatchingEngine::best_bid_price)
267    }
268
269    #[must_use]
270    pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
271        self.matching_engines
272            .get(&instrument_id)
273            .and_then(OrderMatchingEngine::best_ask_price)
274    }
275
276    pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
277        self.matching_engines
278            .get(&instrument_id)
279            .map(OrderMatchingEngine::get_book)
280    }
281
282    #[must_use]
283    pub fn get_matching_engine(
284        &self,
285        instrument_id: &InstrumentId,
286    ) -> Option<&OrderMatchingEngine> {
287        self.matching_engines.get(instrument_id)
288    }
289
290    #[must_use]
291    pub const fn get_matching_engines(&self) -> &HashMap<InstrumentId, OrderMatchingEngine> {
292        &self.matching_engines
293    }
294
295    #[must_use]
296    pub fn get_books(&self) -> HashMap<InstrumentId, OrderBook> {
297        let mut books = HashMap::new();
298        for (instrument_id, matching_engine) in &self.matching_engines {
299            books.insert(*instrument_id, matching_engine.get_book().clone());
300        }
301        books
302    }
303
304    #[must_use]
305    pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
306        instrument_id
307            .and_then(|id| {
308                self.matching_engines
309                    .get(&id)
310                    .map(OrderMatchingEngine::get_open_orders)
311            })
312            .unwrap_or_else(|| {
313                self.matching_engines
314                    .values()
315                    .flat_map(OrderMatchingEngine::get_open_orders)
316                    .collect()
317            })
318    }
319
320    #[must_use]
321    pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
322        instrument_id
323            .and_then(|id| {
324                self.matching_engines
325                    .get(&id)
326                    .map(|engine| engine.get_open_bid_orders().to_vec())
327            })
328            .unwrap_or_else(|| {
329                self.matching_engines
330                    .values()
331                    .flat_map(|engine| engine.get_open_bid_orders().to_vec())
332                    .collect()
333            })
334    }
335
336    #[must_use]
337    pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
338        instrument_id
339            .and_then(|id| {
340                self.matching_engines
341                    .get(&id)
342                    .map(|engine| engine.get_open_ask_orders().to_vec())
343            })
344            .unwrap_or_else(|| {
345                self.matching_engines
346                    .values()
347                    .flat_map(|engine| engine.get_open_ask_orders().to_vec())
348                    .collect()
349            })
350    }
351
352    #[must_use]
353    pub fn get_account(&self) -> Option<AccountAny> {
354        self.exec_client
355            .as_ref()
356            .map(|client| client.get_account().unwrap())
357    }
358
359    pub fn adjust_account(&mut self, adjustment: Money) {
360        if self.frozen_account {
361            // Nothing to adjust
362            return;
363        }
364
365        if let Some(exec_client) = &self.exec_client {
366            let venue = exec_client.venue();
367            println!("Adjusting account for venue {venue}");
368            if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
369                match account.balance(Some(adjustment.currency)) {
370                    Some(balance) => {
371                        let mut current_balance = *balance;
372                        current_balance.total += adjustment;
373                        current_balance.free += adjustment;
374
375                        let margins = match account {
376                            AccountAny::Margin(margin_account) => margin_account.margins.clone(),
377                            _ => HashMap::new(),
378                        };
379
380                        if let Some(exec_client) = &self.exec_client {
381                            exec_client
382                                .generate_account_state(
383                                    vec![current_balance],
384                                    margins.values().copied().collect(),
385                                    true,
386                                    self.clock.borrow().timestamp_ns(),
387                                )
388                                .unwrap();
389                        }
390                    }
391                    None => {
392                        log::error!(
393                            "Cannot adjust account: no balance for currency {}",
394                            adjustment.currency
395                        );
396                    }
397                }
398            } else {
399                log::error!("Cannot adjust account: no account for venue {venue}");
400            }
401        }
402    }
403
404    pub fn send(&mut self, command: TradingCommand) {
405        if !self.use_message_queue {
406            self.process_trading_command(command);
407        } else if self.latency_model.is_none() {
408            self.message_queue.push_back(command);
409        } else {
410            let (ts, counter) = self.generate_inflight_command(&command);
411            self.inflight_queue
412                .push(InflightCommand::new(ts, counter, command));
413        }
414    }
415
416    pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
417        if let Some(latency_model) = &self.latency_model {
418            let ts = match command {
419                TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
420                    command.ts_init() + latency_model.insert_latency_nanos
421                }
422                TradingCommand::ModifyOrder(_) => {
423                    command.ts_init() + latency_model.update_latency_nanos
424                }
425                TradingCommand::CancelOrder(_)
426                | TradingCommand::CancelAllOrders(_)
427                | TradingCommand::BatchCancelOrders(_) => {
428                    command.ts_init() + latency_model.delete_latency_nanos
429                }
430                _ => panic!("Invalid command was {command}"),
431            };
432
433            let counter = self
434                .inflight_counter
435                .entry(ts)
436                .and_modify(|e| *e += 1)
437                .or_insert(1);
438
439            (ts, *counter)
440        } else {
441            panic!("Latency model should be initialized");
442        }
443    }
444
445    pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
446        for module in &self.modules {
447            module.pre_process(Data::Delta(delta));
448        }
449
450        if !self.matching_engines.contains_key(&delta.instrument_id) {
451            let instrument = {
452                let cache = self.cache.as_ref().borrow();
453                cache.instrument(&delta.instrument_id).cloned()
454            };
455
456            if let Some(instrument) = instrument {
457                self.add_instrument(instrument).unwrap();
458            } else {
459                panic!(
460                    "No matching engine found for instrument {}",
461                    delta.instrument_id
462                );
463            }
464        }
465
466        if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
467            matching_engine.process_order_book_delta(&delta);
468        } else {
469            panic!("Matching engine should be initialized");
470        }
471    }
472
473    pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
474        for module in &self.modules {
475            module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
476        }
477
478        if !self.matching_engines.contains_key(&deltas.instrument_id) {
479            let instrument = {
480                let cache = self.cache.as_ref().borrow();
481                cache.instrument(&deltas.instrument_id).cloned()
482            };
483
484            if let Some(instrument) = instrument {
485                self.add_instrument(instrument).unwrap();
486            } else {
487                panic!(
488                    "No matching engine found for instrument {}",
489                    deltas.instrument_id
490                );
491            }
492        }
493
494        if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
495            matching_engine.process_order_book_deltas(&deltas);
496        } else {
497            panic!("Matching engine should be initialized");
498        }
499    }
500
501    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
502        for module in &self.modules {
503            module.pre_process(Data::Quote(quote.to_owned()));
504        }
505
506        if !self.matching_engines.contains_key(&quote.instrument_id) {
507            let instrument = {
508                let cache = self.cache.as_ref().borrow();
509                cache.instrument(&quote.instrument_id).cloned()
510            };
511
512            if let Some(instrument) = instrument {
513                self.add_instrument(instrument).unwrap();
514            } else {
515                panic!(
516                    "No matching engine found for instrument {}",
517                    quote.instrument_id
518                );
519            }
520        }
521
522        if let Some(matching_engine) = self.matching_engines.get_mut(&quote.instrument_id) {
523            matching_engine.process_quote_tick(quote);
524        } else {
525            panic!("Matching engine should be initialized");
526        }
527    }
528
529    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
530        for module in &self.modules {
531            module.pre_process(Data::Trade(trade.to_owned()));
532        }
533
534        if !self.matching_engines.contains_key(&trade.instrument_id) {
535            let instrument = {
536                let cache = self.cache.as_ref().borrow();
537                cache.instrument(&trade.instrument_id).cloned()
538            };
539
540            if let Some(instrument) = instrument {
541                self.add_instrument(instrument).unwrap();
542            } else {
543                panic!(
544                    "No matching engine found for instrument {}",
545                    trade.instrument_id
546                );
547            }
548        }
549
550        if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
551            matching_engine.process_trade_tick(trade);
552        } else {
553            panic!("Matching engine should be initialized");
554        }
555    }
556
557    pub fn process_bar(&mut self, bar: Bar) {
558        for module in &self.modules {
559            module.pre_process(Data::Bar(bar));
560        }
561
562        if !self.matching_engines.contains_key(&bar.instrument_id()) {
563            let instrument = {
564                let cache = self.cache.as_ref().borrow();
565                cache.instrument(&bar.instrument_id()).cloned()
566            };
567
568            if let Some(instrument) = instrument {
569                self.add_instrument(instrument).unwrap();
570            } else {
571                panic!(
572                    "No matching engine found for instrument {}",
573                    bar.instrument_id()
574                );
575            }
576        }
577
578        if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
579            matching_engine.process_bar(&bar);
580        } else {
581            panic!("Matching engine should be initialized");
582        }
583    }
584
585    pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
586        // TODO add module preprocessing
587
588        if !self.matching_engines.contains_key(&status.instrument_id) {
589            let instrument = {
590                let cache = self.cache.as_ref().borrow();
591                cache.instrument(&status.instrument_id).cloned()
592            };
593
594            if let Some(instrument) = instrument {
595                self.add_instrument(instrument).unwrap();
596            } else {
597                panic!(
598                    "No matching engine found for instrument {}",
599                    status.instrument_id
600                );
601            }
602        }
603
604        if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
605            matching_engine.process_status(status.action);
606        } else {
607            panic!("Matching engine should be initialized");
608        }
609    }
610
611    pub fn process(&mut self, ts_now: UnixNanos) {
612        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
613
614        // Process inflight commands
615        while let Some(inflight) = self.inflight_queue.peek() {
616            if inflight.ts > ts_now {
617                // Future commands remain in the queue
618                break;
619            }
620            // We get the inflight command, remove it from the queue and process it
621            let inflight = self.inflight_queue.pop().unwrap();
622            self.process_trading_command(inflight.command);
623        }
624
625        // Process regular message queue
626        while let Some(command) = self.message_queue.pop_front() {
627            self.process_trading_command(command);
628        }
629    }
630
631    pub fn reset(&mut self) {
632        for module in &self.modules {
633            module.reset();
634        }
635
636        self.generate_fresh_account_state();
637
638        for matching_engine in self.matching_engines.values_mut() {
639            matching_engine.reset();
640        }
641
642        // TODO Clear the inflight and message queues
643        log::info!("Resetting exchange state");
644    }
645
646    pub fn process_trading_command(&mut self, command: TradingCommand) {
647        if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
648            let account_id = if let Some(exec_client) = &self.exec_client {
649                exec_client.account_id()
650            } else {
651                panic!("Execution client should be initialized");
652            };
653            match command {
654                TradingCommand::SubmitOrder(mut command) => {
655                    matching_engine.process_order(&mut command.order, account_id);
656                }
657                TradingCommand::ModifyOrder(ref command) => {
658                    matching_engine.process_modify(command, account_id);
659                }
660                TradingCommand::CancelOrder(ref command) => {
661                    matching_engine.process_cancel(command, account_id);
662                }
663                TradingCommand::CancelAllOrders(ref command) => {
664                    matching_engine.process_cancel_all(command, account_id);
665                }
666                TradingCommand::BatchCancelOrders(ref command) => {
667                    matching_engine.process_batch_cancel(command, account_id);
668                }
669                TradingCommand::SubmitOrderList(mut command) => {
670                    for order in &mut command.order_list.orders {
671                        matching_engine.process_order(order, account_id);
672                    }
673                }
674                _ => {}
675            }
676        } else {
677            panic!("Matching engine should be initialized");
678        }
679    }
680
681    pub fn generate_fresh_account_state(&self) {
682        let balances: Vec<AccountBalance> = self
683            .starting_balances
684            .iter()
685            .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
686            .collect();
687
688        if let Some(exec_client) = &self.exec_client {
689            exec_client
690                .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
691                .unwrap();
692        }
693
694        // Set leverages
695        if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
696            margin_account.set_default_leverage(self.default_leverage.to_f64().unwrap());
697
698            // Set instrument specific leverages
699            for (instrument_id, leverage) in &self.leverages {
700                margin_account.set_leverage(*instrument_id, leverage.to_f64().unwrap());
701            }
702        }
703    }
704}
705
706////////////////////////////////////////////////////////////////////////////////
707// Tests
708////////////////////////////////////////////////////////////////////////////////
709#[cfg(test)]
710mod tests {
711    use std::{
712        cell::RefCell,
713        collections::{BinaryHeap, HashMap},
714        rc::Rc,
715        sync::LazyLock,
716    };
717
718    use nautilus_common::{
719        cache::Cache,
720        clock::TestClock,
721        msgbus::{
722            self,
723            stubs::{get_message_saving_handler, get_saved_messages},
724        },
725    };
726    use nautilus_core::{AtomicTime, UUID4, UnixNanos};
727    use nautilus_execution::{
728        messages::{SubmitOrder, TradingCommand},
729        models::{
730            fee::{FeeModelAny, MakerTakerFeeModel},
731            fill::FillModel,
732            latency::LatencyModel,
733        },
734    };
735    use nautilus_model::{
736        accounts::{AccountAny, MarginAccount},
737        data::{
738            Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
739            TradeTick,
740        },
741        enums::{
742            AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
743            OmsType, OrderSide, OrderType,
744        },
745        events::AccountState,
746        identifiers::{
747            AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
748            VenueOrderId,
749        },
750        instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
751        orders::OrderTestBuilder,
752        types::{AccountBalance, Currency, Money, Price, Quantity},
753    };
754    use rstest::rstest;
755    use ustr::Ustr;
756
757    use crate::{
758        exchange::{InflightCommand, SimulatedExchange},
759        execution_client::BacktestExecutionClient,
760    };
761
762    static ATOMIC_TIME: LazyLock<AtomicTime> =
763        LazyLock::new(|| AtomicTime::new(true, UnixNanos::default()));
764
765    fn get_exchange(
766        venue: Venue,
767        account_type: AccountType,
768        book_type: BookType,
769        cache: Option<Rc<RefCell<Cache>>>,
770    ) -> Rc<RefCell<SimulatedExchange>> {
771        let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
772        let clock = Rc::new(RefCell::new(TestClock::new()));
773        let exchange = Rc::new(RefCell::new(
774            SimulatedExchange::new(
775                venue,
776                OmsType::Netting,
777                account_type,
778                vec![Money::new(1000.0, Currency::USD())],
779                None,
780                1.into(),
781                HashMap::new(),
782                vec![],
783                cache.clone(),
784                clock,
785                FillModel::default(),
786                FeeModelAny::MakerTaker(MakerTakerFeeModel),
787                book_type,
788                None,
789                None,
790                None,
791                None,
792                None,
793                None,
794                None,
795                None,
796                None,
797                None,
798            )
799            .unwrap(),
800        ));
801
802        let clock = TestClock::new();
803        let execution_client = BacktestExecutionClient::new(
804            TraderId::default(),
805            AccountId::default(),
806            exchange.clone(),
807            cache.clone(),
808            Rc::new(RefCell::new(clock)),
809            None,
810            None,
811        );
812        exchange
813            .borrow_mut()
814            .register_client(Rc::new(execution_client));
815
816        exchange
817    }
818
819    fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
820        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
821        let order = OrderTestBuilder::new(OrderType::Market)
822            .instrument_id(instrument_id)
823            .quantity(Quantity::from(1))
824            .build();
825        TradingCommand::SubmitOrder(
826            SubmitOrder::new(
827                TraderId::default(),
828                ClientId::default(),
829                StrategyId::default(),
830                instrument_id,
831                ClientOrderId::default(),
832                VenueOrderId::default(),
833                order,
834                None,
835                None,
836                UUID4::default(),
837                ts_init,
838            )
839            .unwrap(),
840        )
841    }
842
843    #[rstest]
844    #[should_panic(
845        expected = r#"Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"#
846    )]
847    fn test_venue_mismatch_between_exchange_and_instrument(
848        crypto_perpetual_ethusdt: CryptoPerpetual,
849    ) {
850        let exchange = get_exchange(
851            Venue::new("SIM"),
852            AccountType::Margin,
853            BookType::L1_MBP,
854            None,
855        );
856        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
857        exchange.borrow_mut().add_instrument(instrument).unwrap();
858    }
859
860    #[rstest]
861    #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
862    fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
863        let exchange = get_exchange(
864            Venue::new("BINANCE"),
865            AccountType::Cash,
866            BookType::L1_MBP,
867            None,
868        );
869        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
870        exchange.borrow_mut().add_instrument(instrument).unwrap();
871    }
872
873    #[rstest]
874    fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
875        let exchange = get_exchange(
876            Venue::new("BINANCE"),
877            AccountType::Margin,
878            BookType::L1_MBP,
879            None,
880        );
881        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
882
883        // register instrument
884        exchange.borrow_mut().add_instrument(instrument).unwrap();
885
886        // process tick
887        let quote_tick = QuoteTick::new(
888            crypto_perpetual_ethusdt.id,
889            Price::from("1000"),
890            Price::from("1001"),
891            Quantity::from(1),
892            Quantity::from(1),
893            UnixNanos::default(),
894            UnixNanos::default(),
895        );
896        exchange.borrow_mut().process_quote_tick(&quote_tick);
897
898        let best_bid_price = exchange
899            .borrow()
900            .best_bid_price(crypto_perpetual_ethusdt.id);
901        assert_eq!(best_bid_price, Some(Price::from("1000")));
902        let best_ask_price = exchange
903            .borrow()
904            .best_ask_price(crypto_perpetual_ethusdt.id);
905        assert_eq!(best_ask_price, Some(Price::from("1001")));
906    }
907
908    #[rstest]
909    fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
910        let exchange = get_exchange(
911            Venue::new("BINANCE"),
912            AccountType::Margin,
913            BookType::L1_MBP,
914            None,
915        );
916        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
917
918        // register instrument
919        exchange.borrow_mut().add_instrument(instrument).unwrap();
920
921        // process tick
922        let trade_tick = TradeTick::new(
923            crypto_perpetual_ethusdt.id,
924            Price::from("1000"),
925            Quantity::from(1),
926            AggressorSide::Buyer,
927            TradeId::from("1"),
928            UnixNanos::default(),
929            UnixNanos::default(),
930        );
931        exchange.borrow_mut().process_trade_tick(&trade_tick);
932
933        let best_bid_price = exchange
934            .borrow()
935            .best_bid_price(crypto_perpetual_ethusdt.id);
936        assert_eq!(best_bid_price, Some(Price::from("1000")));
937        let best_ask = exchange
938            .borrow()
939            .best_ask_price(crypto_perpetual_ethusdt.id);
940        assert_eq!(best_ask, Some(Price::from("1000")));
941    }
942
943    #[rstest]
944    fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
945        let exchange = get_exchange(
946            Venue::new("BINANCE"),
947            AccountType::Margin,
948            BookType::L1_MBP,
949            None,
950        );
951        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
952
953        // register instrument
954        exchange.borrow_mut().add_instrument(instrument).unwrap();
955
956        // process bar
957        let bar = Bar::new(
958            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
959            Price::from("1500.00"),
960            Price::from("1505.00"),
961            Price::from("1490.00"),
962            Price::from("1502.00"),
963            Quantity::from(100),
964            UnixNanos::default(),
965            UnixNanos::default(),
966        );
967        exchange.borrow_mut().process_bar(bar);
968
969        // this will be processed as ticks so both bid and ask will be the same as close of the bar
970        let best_bid_price = exchange
971            .borrow()
972            .best_bid_price(crypto_perpetual_ethusdt.id);
973        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
974        let best_ask_price = exchange
975            .borrow()
976            .best_ask_price(crypto_perpetual_ethusdt.id);
977        assert_eq!(best_ask_price, Some(Price::from("1502.00")));
978    }
979
980    #[rstest]
981    fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
982        let exchange = get_exchange(
983            Venue::new("BINANCE"),
984            AccountType::Margin,
985            BookType::L1_MBP,
986            None,
987        );
988        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
989
990        // register instrument
991        exchange.borrow_mut().add_instrument(instrument).unwrap();
992
993        // create both bid and ask based bars
994        // add +1 on ask to make sure it is different from bid
995        let bar_bid = Bar::new(
996            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
997            Price::from("1500.00"),
998            Price::from("1505.00"),
999            Price::from("1490.00"),
1000            Price::from("1502.00"),
1001            Quantity::from(100),
1002            UnixNanos::from(1),
1003            UnixNanos::from(1),
1004        );
1005        let bar_ask = Bar::new(
1006            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1007            Price::from("1501.00"),
1008            Price::from("1506.00"),
1009            Price::from("1491.00"),
1010            Price::from("1503.00"),
1011            Quantity::from(100),
1012            UnixNanos::from(1),
1013            UnixNanos::from(1),
1014        );
1015
1016        // process them
1017        exchange.borrow_mut().process_bar(bar_bid);
1018        exchange.borrow_mut().process_bar(bar_ask);
1019
1020        // current bid and ask prices will be the corresponding close of the ask and bid bar
1021        let best_bid_price = exchange
1022            .borrow()
1023            .best_bid_price(crypto_perpetual_ethusdt.id);
1024        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1025        let best_ask_price = exchange
1026            .borrow()
1027            .best_ask_price(crypto_perpetual_ethusdt.id);
1028        assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1029    }
1030
1031    #[rstest]
1032    fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1033        let exchange = get_exchange(
1034            Venue::new("BINANCE"),
1035            AccountType::Margin,
1036            BookType::L2_MBP,
1037            None,
1038        );
1039        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1040
1041        // register instrument
1042        exchange.borrow_mut().add_instrument(instrument).unwrap();
1043
1044        // create order book delta at both bid and ask with incremented ts init and sequence
1045        let delta_buy = OrderBookDelta::new(
1046            crypto_perpetual_ethusdt.id,
1047            BookAction::Add,
1048            BookOrder::new(OrderSide::Buy, Price::from("1000.00"), Quantity::from(1), 1),
1049            0,
1050            0,
1051            UnixNanos::from(1),
1052            UnixNanos::from(1),
1053        );
1054        let delta_sell = OrderBookDelta::new(
1055            crypto_perpetual_ethusdt.id,
1056            BookAction::Add,
1057            BookOrder::new(
1058                OrderSide::Sell,
1059                Price::from("1001.00"),
1060                Quantity::from(1),
1061                1,
1062            ),
1063            0,
1064            1,
1065            UnixNanos::from(2),
1066            UnixNanos::from(2),
1067        );
1068
1069        // process both deltas
1070        exchange.borrow_mut().process_order_book_delta(delta_buy);
1071        exchange.borrow_mut().process_order_book_delta(delta_sell);
1072
1073        let book = exchange
1074            .borrow()
1075            .get_book(crypto_perpetual_ethusdt.id)
1076            .unwrap()
1077            .clone();
1078        assert_eq!(book.update_count, 2);
1079        assert_eq!(book.sequence, 1);
1080        assert_eq!(book.ts_last, UnixNanos::from(2));
1081        let best_bid_price = exchange
1082            .borrow()
1083            .best_bid_price(crypto_perpetual_ethusdt.id);
1084        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1085        let best_ask_price = exchange
1086            .borrow()
1087            .best_ask_price(crypto_perpetual_ethusdt.id);
1088        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1089    }
1090
1091    #[rstest]
1092    fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1093        let exchange = get_exchange(
1094            Venue::new("BINANCE"),
1095            AccountType::Margin,
1096            BookType::L2_MBP,
1097            None,
1098        );
1099        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1100
1101        // register instrument
1102        exchange.borrow_mut().add_instrument(instrument).unwrap();
1103
1104        // create two sell order book deltas with same timestamps and higher sequence
1105        let delta_sell_1 = OrderBookDelta::new(
1106            crypto_perpetual_ethusdt.id,
1107            BookAction::Add,
1108            BookOrder::new(
1109                OrderSide::Sell,
1110                Price::from("1000.00"),
1111                Quantity::from(3),
1112                1,
1113            ),
1114            0,
1115            0,
1116            UnixNanos::from(1),
1117            UnixNanos::from(1),
1118        );
1119        let delta_sell_2 = OrderBookDelta::new(
1120            crypto_perpetual_ethusdt.id,
1121            BookAction::Add,
1122            BookOrder::new(
1123                OrderSide::Sell,
1124                Price::from("1001.00"),
1125                Quantity::from(1),
1126                1,
1127            ),
1128            0,
1129            1,
1130            UnixNanos::from(1),
1131            UnixNanos::from(1),
1132        );
1133        let orderbook_deltas = OrderBookDeltas::new(
1134            crypto_perpetual_ethusdt.id,
1135            vec![delta_sell_1, delta_sell_2],
1136        );
1137
1138        // process both deltas
1139        exchange
1140            .borrow_mut()
1141            .process_order_book_deltas(orderbook_deltas);
1142
1143        let book = exchange
1144            .borrow()
1145            .get_book(crypto_perpetual_ethusdt.id)
1146            .unwrap()
1147            .clone();
1148        assert_eq!(book.update_count, 2);
1149        assert_eq!(book.sequence, 1);
1150        assert_eq!(book.ts_last, UnixNanos::from(1));
1151        let best_bid_price = exchange
1152            .borrow()
1153            .best_bid_price(crypto_perpetual_ethusdt.id);
1154        // no bid orders in orderbook deltas
1155        assert_eq!(best_bid_price, None);
1156        let best_ask_price = exchange
1157            .borrow()
1158            .best_ask_price(crypto_perpetual_ethusdt.id);
1159        // best ask price is the first order in orderbook deltas
1160        assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1161    }
1162
1163    #[rstest]
1164    fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1165        let exchange = get_exchange(
1166            Venue::new("BINANCE"),
1167            AccountType::Margin,
1168            BookType::L2_MBP,
1169            None,
1170        );
1171        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1172
1173        // register instrument
1174        exchange.borrow_mut().add_instrument(instrument).unwrap();
1175
1176        let instrument_status = InstrumentStatus::new(
1177            crypto_perpetual_ethusdt.id,
1178            MarketStatusAction::Close, // close the market
1179            UnixNanos::from(1),
1180            UnixNanos::from(1),
1181            None,
1182            None,
1183            None,
1184            None,
1185            None,
1186        );
1187
1188        exchange
1189            .borrow_mut()
1190            .process_instrument_status(instrument_status);
1191
1192        let market_status = exchange
1193            .borrow()
1194            .get_matching_engine(&crypto_perpetual_ethusdt.id)
1195            .unwrap()
1196            .market_status;
1197        assert_eq!(market_status, MarketStatus::Closed);
1198    }
1199
1200    #[rstest]
1201    fn test_accounting() {
1202        let account_type = AccountType::Margin;
1203        let mut cache = Cache::default();
1204        let handler = get_message_saving_handler::<AccountState>(None);
1205        msgbus::register(Ustr::from("Portfolio.update_account"), handler.clone());
1206        let margin_account = MarginAccount::new(
1207            AccountState::new(
1208                AccountId::from("SIM-001"),
1209                account_type,
1210                vec![AccountBalance::new(
1211                    Money::from("1000 USD"),
1212                    Money::from("0 USD"),
1213                    Money::from("1000 USD"),
1214                )],
1215                vec![],
1216                false,
1217                UUID4::default(),
1218                UnixNanos::default(),
1219                UnixNanos::default(),
1220                None,
1221            ),
1222            false,
1223        );
1224        let () = cache
1225            .add_account(AccountAny::Margin(margin_account))
1226            .unwrap();
1227        // build indexes
1228        cache.build_index();
1229
1230        let exchange = get_exchange(
1231            Venue::new("SIM"),
1232            account_type,
1233            BookType::L2_MBP,
1234            Some(Rc::new(RefCell::new(cache))),
1235        );
1236        exchange.borrow_mut().initialize_account();
1237
1238        // Test adjust account, increase balance by 500 USD
1239        exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1240
1241        // Check if we received two messages, one for initial account state and one for adjusted account state
1242        let messages = get_saved_messages::<AccountState>(handler);
1243        assert_eq!(messages.len(), 2);
1244        let account_state_first = messages.first().unwrap();
1245        let account_state_second = messages.last().unwrap();
1246
1247        assert_eq!(account_state_first.balances.len(), 1);
1248        let current_balance = account_state_first.balances[0];
1249        assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1250        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1251        assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1252
1253        assert_eq!(account_state_second.balances.len(), 1);
1254        let current_balance = account_state_second.balances[0];
1255        assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1256        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1257        assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1258    }
1259
1260    #[rstest]
1261    fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1262        // Create 3 inflight commands with different timestamps and counters
1263        let inflight1 = InflightCommand::new(
1264            UnixNanos::from(100),
1265            1,
1266            create_submit_order_command(UnixNanos::from(100)),
1267        );
1268        let inflight2 = InflightCommand::new(
1269            UnixNanos::from(200),
1270            2,
1271            create_submit_order_command(UnixNanos::from(200)),
1272        );
1273        let inflight3 = InflightCommand::new(
1274            UnixNanos::from(100),
1275            2,
1276            create_submit_order_command(UnixNanos::from(100)),
1277        );
1278
1279        // Create a binary heap and push the inflight commands
1280        let mut inflight_heap = BinaryHeap::new();
1281        inflight_heap.push(inflight1);
1282        inflight_heap.push(inflight2);
1283        inflight_heap.push(inflight3);
1284
1285        // Pop the inflight commands and check if they are in the correct order
1286        // by our custom ordering with counter and timestamp
1287        let first = inflight_heap.pop().unwrap();
1288        let second = inflight_heap.pop().unwrap();
1289        let third = inflight_heap.pop().unwrap();
1290
1291        assert_eq!(first.ts, UnixNanos::from(100));
1292        assert_eq!(first.counter, 1);
1293        assert_eq!(second.ts, UnixNanos::from(100));
1294        assert_eq!(second.counter, 2);
1295        assert_eq!(third.ts, UnixNanos::from(200));
1296        assert_eq!(third.counter, 2);
1297    }
1298
1299    #[rstest]
1300    fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1301        let exchange = get_exchange(
1302            Venue::new("BINANCE"),
1303            AccountType::Margin,
1304            BookType::L2_MBP,
1305            None,
1306        );
1307
1308        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1309        exchange.borrow_mut().add_instrument(instrument).unwrap();
1310
1311        let command1 = create_submit_order_command(UnixNanos::from(100));
1312        let command2 = create_submit_order_command(UnixNanos::from(200));
1313
1314        exchange.borrow_mut().send(command1);
1315        exchange.borrow_mut().send(command2);
1316
1317        // Verify that message queue has 2 commands and inflight queue is empty
1318        // as we are not using latency model
1319        assert_eq!(exchange.borrow().message_queue.len(), 2);
1320        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1321
1322        // Process command and check that queues is empty
1323        exchange.borrow_mut().process(UnixNanos::from(300));
1324        assert_eq!(exchange.borrow().message_queue.len(), 0);
1325        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1326    }
1327
1328    #[rstest]
1329    fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1330        let latency_model = LatencyModel::new(
1331            UnixNanos::from(100),
1332            UnixNanos::from(200),
1333            UnixNanos::from(300),
1334            UnixNanos::from(100),
1335        );
1336        let exchange = get_exchange(
1337            Venue::new("BINANCE"),
1338            AccountType::Margin,
1339            BookType::L2_MBP,
1340            None,
1341        );
1342        exchange.borrow_mut().set_latency_model(latency_model);
1343
1344        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1345        exchange.borrow_mut().add_instrument(instrument).unwrap();
1346
1347        let command1 = create_submit_order_command(UnixNanos::from(100));
1348        let command2 = create_submit_order_command(UnixNanos::from(150));
1349        exchange.borrow_mut().send(command1);
1350        exchange.borrow_mut().send(command2);
1351
1352        // Verify that inflight queue has 2 commands and message queue is empty
1353        assert_eq!(exchange.borrow().message_queue.len(), 0);
1354        assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1355        // First inflight command should have timestamp at 100 and 200 insert latency
1356        assert_eq!(
1357            exchange.borrow().inflight_queue.iter().nth(0).unwrap().ts,
1358            UnixNanos::from(300)
1359        );
1360        // Second inflight command should have timestamp at 150 and 200 insert latency
1361        assert_eq!(
1362            exchange.borrow().inflight_queue.iter().nth(1).unwrap().ts,
1363            UnixNanos::from(350)
1364        );
1365
1366        // Process at timestamp 350, and test that only first command is processed
1367        exchange.borrow_mut().process(UnixNanos::from(320));
1368        assert_eq!(exchange.borrow().message_queue.len(), 0);
1369        assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1370        assert_eq!(
1371            exchange.borrow().inflight_queue.iter().nth(0).unwrap().ts,
1372            UnixNanos::from(350)
1373        );
1374    }
1375}