1#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23 cell::RefCell,
24 collections::{BinaryHeap, VecDeque},
25 fmt::Debug,
26 rc::Rc,
27};
28
29use ahash::AHashMap;
30use nautilus_common::{
31 cache::Cache, clients::ExecutionClient, clock::Clock, messages::execution::TradingCommand,
32};
33use nautilus_core::{
34 UnixNanos,
35 correctness::{FAILED, check_equal},
36};
37use nautilus_execution::{
38 matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
39 models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
40};
41use nautilus_model::{
42 accounts::AccountAny,
43 data::{
44 Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
45 QuoteTick, TradeTick,
46 },
47 enums::{AccountType, BookType, OmsType},
48 identifiers::{InstrumentId, Venue},
49 instruments::{Instrument, InstrumentAny},
50 orderbook::OrderBook,
51 orders::PassiveOrderAny,
52 types::{AccountBalance, Currency, Money, Price},
53};
54use rust_decimal::Decimal;
55
56use crate::modules::SimulationModule;
57
58#[derive(Debug, Eq, PartialEq)]
62struct InflightCommand {
63 timestamp: UnixNanos,
64 counter: u32,
65 command: TradingCommand,
66}
67
68impl InflightCommand {
69 const fn new(timestamp: UnixNanos, counter: u32, command: TradingCommand) -> Self {
70 Self {
71 timestamp,
72 counter,
73 command,
74 }
75 }
76}
77
78impl Ord for InflightCommand {
79 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
80 other
82 .timestamp
83 .cmp(&self.timestamp)
84 .then_with(|| other.counter.cmp(&self.counter))
85 }
86}
87
88impl PartialOrd for InflightCommand {
89 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
90 Some(self.cmp(other))
91 }
92}
93
94pub struct SimulatedExchange {
110 pub id: Venue,
111 pub oms_type: OmsType,
112 pub account_type: AccountType,
113 starting_balances: Vec<Money>,
114 book_type: BookType,
115 default_leverage: Decimal,
116 exec_client: Option<Rc<dyn ExecutionClient>>,
117 pub base_currency: Option<Currency>,
118 fee_model: FeeModelAny,
119 fill_model: FillModel,
120 latency_model: Option<Box<dyn LatencyModel>>,
121 instruments: AHashMap<InstrumentId, InstrumentAny>,
122 matching_engines: AHashMap<InstrumentId, OrderMatchingEngine>,
123 leverages: AHashMap<InstrumentId, Decimal>,
124 modules: Vec<Box<dyn SimulationModule>>,
125 clock: Rc<RefCell<dyn Clock>>,
126 cache: Rc<RefCell<Cache>>,
127 message_queue: VecDeque<TradingCommand>,
128 inflight_queue: BinaryHeap<InflightCommand>,
129 inflight_counter: AHashMap<UnixNanos, u32>,
130 bar_execution: bool,
131 trade_execution: bool,
132 liquidity_consumption: bool,
133 reject_stop_orders: bool,
134 support_gtd_orders: bool,
135 support_contingent_orders: bool,
136 use_position_ids: bool,
137 use_random_ids: bool,
138 use_reduce_only: bool,
139 use_message_queue: bool,
140 allow_cash_borrowing: bool,
141 frozen_account: bool,
142 price_protection_points: u32,
143}
144
145impl Debug for SimulatedExchange {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 f.debug_struct(stringify!(SimulatedExchange))
148 .field("id", &self.id)
149 .field("account_type", &self.account_type)
150 .finish()
151 }
152}
153
154impl SimulatedExchange {
155 #[allow(clippy::too_many_arguments)]
163 pub fn new(
164 venue: Venue,
165 oms_type: OmsType,
166 account_type: AccountType,
167 starting_balances: Vec<Money>,
168 base_currency: Option<Currency>,
169 default_leverage: Decimal,
170 leverages: AHashMap<InstrumentId, Decimal>,
171 modules: Vec<Box<dyn SimulationModule>>,
172 cache: Rc<RefCell<Cache>>,
173 clock: Rc<RefCell<dyn Clock>>,
174 fill_model: FillModel,
175 fee_model: FeeModelAny,
176 book_type: BookType,
177 latency_model: Option<Box<dyn LatencyModel>>,
178 bar_execution: Option<bool>,
179 trade_execution: Option<bool>,
180 liquidity_consumption: Option<bool>,
181 reject_stop_orders: Option<bool>,
182 support_gtd_orders: Option<bool>,
183 support_contingent_orders: Option<bool>,
184 use_position_ids: Option<bool>,
185 use_random_ids: Option<bool>,
186 use_reduce_only: Option<bool>,
187 use_message_queue: Option<bool>,
188 allow_cash_borrowing: Option<bool>,
189 frozen_account: Option<bool>,
190 price_protection_points: Option<u32>,
191 ) -> anyhow::Result<Self> {
192 if starting_balances.is_empty() {
193 anyhow::bail!("Starting balances must be provided")
194 }
195 if base_currency.is_some() && starting_balances.len() > 1 {
196 anyhow::bail!("single-currency account has multiple starting currencies")
197 }
198 Ok(Self {
200 id: venue,
201 oms_type,
202 account_type,
203 starting_balances,
204 book_type,
205 default_leverage,
206 exec_client: None,
207 base_currency,
208 fee_model,
209 fill_model,
210 latency_model,
211 instruments: AHashMap::new(),
212 matching_engines: AHashMap::new(),
213 leverages,
214 modules,
215 clock,
216 cache,
217 message_queue: VecDeque::new(),
218 inflight_queue: BinaryHeap::new(),
219 inflight_counter: AHashMap::new(),
220 bar_execution: bar_execution.unwrap_or(true),
221 trade_execution: trade_execution.unwrap_or(true),
222 liquidity_consumption: liquidity_consumption.unwrap_or(true),
223 reject_stop_orders: reject_stop_orders.unwrap_or(true),
224 support_gtd_orders: support_gtd_orders.unwrap_or(true),
225 support_contingent_orders: support_contingent_orders.unwrap_or(true),
226 use_position_ids: use_position_ids.unwrap_or(true),
227 use_random_ids: use_random_ids.unwrap_or(false),
228 use_reduce_only: use_reduce_only.unwrap_or(true),
229 use_message_queue: use_message_queue.unwrap_or(true),
230 allow_cash_borrowing: allow_cash_borrowing.unwrap_or(false),
231 frozen_account: frozen_account.unwrap_or(false),
232 price_protection_points: price_protection_points.unwrap_or(0),
233 })
234 }
235
236 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
237 self.exec_client = Some(client);
238 }
239
240 pub fn set_fill_model(&mut self, fill_model: FillModel) {
241 for matching_engine in self.matching_engines.values_mut() {
242 matching_engine.set_fill_model(fill_model.clone());
243 log::info!(
244 "Setting fill model for {} to {}",
245 matching_engine.venue,
246 self.fill_model
247 );
248 }
249 self.fill_model = fill_model;
250 }
251
252 pub fn set_latency_model(&mut self, latency_model: Box<dyn LatencyModel>) {
253 self.latency_model = Some(latency_model);
254 }
255
256 pub fn initialize_account(&mut self) {
257 self.generate_fresh_account_state();
258 }
259
260 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
270 check_equal(
271 &instrument.id().venue,
272 &self.id,
273 "Venue of instrument id",
274 "Venue of simulated exchange",
275 )
276 .expect(FAILED);
277
278 if self.account_type == AccountType::Cash
279 && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
280 || matches!(instrument, InstrumentAny::CryptoFuture(_)))
281 {
282 anyhow::bail!("Cash account cannot trade futures or perpetuals")
283 }
284
285 self.instruments.insert(instrument.id(), instrument.clone());
286
287 let price_protection = if self.price_protection_points == 0 {
288 None
289 } else {
290 Some(self.price_protection_points)
291 };
292
293 let matching_engine_config = OrderMatchingEngineConfig::new(
294 self.bar_execution,
295 self.trade_execution,
296 self.liquidity_consumption,
297 self.reject_stop_orders,
298 self.support_gtd_orders,
299 self.support_contingent_orders,
300 self.use_position_ids,
301 self.use_random_ids,
302 self.use_reduce_only,
303 )
304 .with_price_protection_points(price_protection);
305 let instrument_id = instrument.id();
306 let matching_engine = OrderMatchingEngine::new(
307 instrument,
308 self.instruments.len() as u32,
309 self.fill_model.clone(),
310 self.fee_model.clone(),
311 self.book_type,
312 self.oms_type,
313 self.account_type,
314 self.clock.clone(),
315 Rc::clone(&self.cache),
316 matching_engine_config,
317 );
318 self.matching_engines.insert(instrument_id, matching_engine);
319
320 log::info!("Added instrument {instrument_id} and created matching engine");
321 Ok(())
322 }
323
324 #[must_use]
325 pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
326 self.matching_engines
327 .get(&instrument_id)
328 .and_then(OrderMatchingEngine::best_bid_price)
329 }
330
331 #[must_use]
332 pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
333 self.matching_engines
334 .get(&instrument_id)
335 .and_then(OrderMatchingEngine::best_ask_price)
336 }
337
338 pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
339 self.matching_engines
340 .get(&instrument_id)
341 .map(OrderMatchingEngine::get_book)
342 }
343
344 #[must_use]
345 pub fn get_matching_engine(
346 &self,
347 instrument_id: &InstrumentId,
348 ) -> Option<&OrderMatchingEngine> {
349 self.matching_engines.get(instrument_id)
350 }
351
352 #[must_use]
353 pub const fn get_matching_engines(&self) -> &AHashMap<InstrumentId, OrderMatchingEngine> {
354 &self.matching_engines
355 }
356
357 #[must_use]
358 pub fn get_books(&self) -> AHashMap<InstrumentId, OrderBook> {
359 let mut books = AHashMap::new();
360 for (instrument_id, matching_engine) in &self.matching_engines {
361 books.insert(*instrument_id, matching_engine.get_book().clone());
362 }
363 books
364 }
365
366 #[must_use]
367 pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
368 instrument_id
369 .and_then(|id| {
370 self.matching_engines
371 .get(&id)
372 .map(OrderMatchingEngine::get_open_orders)
373 })
374 .unwrap_or_else(|| {
375 self.matching_engines
376 .values()
377 .flat_map(OrderMatchingEngine::get_open_orders)
378 .collect()
379 })
380 }
381
382 #[must_use]
383 pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
384 instrument_id
385 .and_then(|id| {
386 self.matching_engines
387 .get(&id)
388 .map(|engine| engine.get_open_bid_orders().to_vec())
389 })
390 .unwrap_or_else(|| {
391 self.matching_engines
392 .values()
393 .flat_map(|engine| engine.get_open_bid_orders().to_vec())
394 .collect()
395 })
396 }
397
398 #[must_use]
399 pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
400 instrument_id
401 .and_then(|id| {
402 self.matching_engines
403 .get(&id)
404 .map(|engine| engine.get_open_ask_orders().to_vec())
405 })
406 .unwrap_or_else(|| {
407 self.matching_engines
408 .values()
409 .flat_map(|engine| engine.get_open_ask_orders().to_vec())
410 .collect()
411 })
412 }
413
414 #[must_use]
418 pub fn get_account(&self) -> Option<AccountAny> {
419 self.exec_client
420 .as_ref()
421 .map(|client| client.get_account().unwrap())
422 }
423
424 pub fn adjust_account(&mut self, adjustment: Money) {
428 if self.frozen_account {
429 return;
431 }
432
433 if let Some(exec_client) = &self.exec_client {
434 let venue = exec_client.venue();
435 println!("Adjusting account for venue {venue}");
436 if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
437 match account.balance(Some(adjustment.currency)) {
438 Some(balance) => {
439 let mut current_balance = *balance;
440 current_balance.total += adjustment;
441 current_balance.free += adjustment;
442
443 let margins = match account {
444 AccountAny::Margin(margin_account) => margin_account.margins.clone(),
445 _ => AHashMap::new(),
446 };
447
448 if let Some(exec_client) = &self.exec_client {
449 exec_client
450 .generate_account_state(
451 vec![current_balance],
452 margins.values().copied().collect(),
453 true,
454 self.clock.borrow().timestamp_ns(),
455 )
456 .unwrap();
457 }
458 }
459 None => {
460 log::error!(
461 "Cannot adjust account: no balance for currency {}",
462 adjustment.currency
463 );
464 }
465 }
466 } else {
467 log::error!("Cannot adjust account: no account for venue {venue}");
468 }
469 }
470 }
471
472 pub fn send(&mut self, command: TradingCommand) {
473 if !self.use_message_queue {
474 self.process_trading_command(command);
475 } else if self.latency_model.is_none() {
476 self.message_queue.push_back(command);
477 } else {
478 let (timestamp, counter) = self.generate_inflight_command(&command);
479 self.inflight_queue
480 .push(InflightCommand::new(timestamp, counter, command));
481 }
482 }
483
484 pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
488 if let Some(latency_model) = &self.latency_model {
489 let ts = match command {
490 TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
491 command.ts_init() + latency_model.get_insert_latency()
492 }
493 TradingCommand::ModifyOrder(_) => {
494 command.ts_init() + latency_model.get_update_latency()
495 }
496 TradingCommand::CancelOrder(_)
497 | TradingCommand::CancelAllOrders(_)
498 | TradingCommand::BatchCancelOrders(_) => {
499 command.ts_init() + latency_model.get_delete_latency()
500 }
501 _ => panic!("Cannot handle command: {command:?}"),
502 };
503
504 let counter = self
505 .inflight_counter
506 .entry(ts)
507 .and_modify(|e| *e += 1)
508 .or_insert(1);
509
510 (ts, *counter)
511 } else {
512 panic!("Latency model should be initialized");
513 }
514 }
515
516 pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
520 for module in &self.modules {
521 module.pre_process(Data::Delta(delta));
522 }
523
524 if !self.matching_engines.contains_key(&delta.instrument_id) {
525 let instrument = {
526 let cache = self.cache.as_ref().borrow();
527 cache.instrument(&delta.instrument_id).cloned()
528 };
529
530 if let Some(instrument) = instrument {
531 self.add_instrument(instrument).unwrap();
532 } else {
533 panic!(
534 "No matching engine found for instrument {}",
535 delta.instrument_id
536 );
537 }
538 }
539
540 if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
541 matching_engine.process_order_book_delta(&delta).unwrap();
542 } else {
543 panic!("Matching engine should be initialized");
544 }
545 }
546
547 pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
551 for module in &self.modules {
552 module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
553 }
554
555 if !self.matching_engines.contains_key(&deltas.instrument_id) {
556 let instrument = {
557 let cache = self.cache.as_ref().borrow();
558 cache.instrument(&deltas.instrument_id).cloned()
559 };
560
561 if let Some(instrument) = instrument {
562 self.add_instrument(instrument).unwrap();
563 } else {
564 panic!(
565 "No matching engine found for instrument {}",
566 deltas.instrument_id
567 );
568 }
569 }
570
571 if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
572 matching_engine.process_order_book_deltas(&deltas).unwrap();
573 } else {
574 panic!("Matching engine should be initialized");
575 }
576 }
577
578 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
582 for module in &self.modules {
583 module.pre_process(Data::Quote(quote.to_owned()));
584 }
585
586 if !self.matching_engines.contains_key("e.instrument_id) {
587 let instrument = {
588 let cache = self.cache.as_ref().borrow();
589 cache.instrument("e.instrument_id).cloned()
590 };
591
592 if let Some(instrument) = instrument {
593 self.add_instrument(instrument).unwrap();
594 } else {
595 panic!(
596 "No matching engine found for instrument {}",
597 quote.instrument_id
598 );
599 }
600 }
601
602 if let Some(matching_engine) = self.matching_engines.get_mut("e.instrument_id) {
603 matching_engine.process_quote_tick(quote);
604 } else {
605 panic!("Matching engine should be initialized");
606 }
607 }
608
609 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
613 for module in &self.modules {
614 module.pre_process(Data::Trade(trade.to_owned()));
615 }
616
617 if !self.matching_engines.contains_key(&trade.instrument_id) {
618 let instrument = {
619 let cache = self.cache.as_ref().borrow();
620 cache.instrument(&trade.instrument_id).cloned()
621 };
622
623 if let Some(instrument) = instrument {
624 self.add_instrument(instrument).unwrap();
625 } else {
626 panic!(
627 "No matching engine found for instrument {}",
628 trade.instrument_id
629 );
630 }
631 }
632
633 if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
634 matching_engine.process_trade_tick(trade);
635 } else {
636 panic!("Matching engine should be initialized");
637 }
638 }
639
640 pub fn process_bar(&mut self, bar: Bar) {
644 for module in &self.modules {
645 module.pre_process(Data::Bar(bar));
646 }
647
648 if !self.matching_engines.contains_key(&bar.instrument_id()) {
649 let instrument = {
650 let cache = self.cache.as_ref().borrow();
651 cache.instrument(&bar.instrument_id()).cloned()
652 };
653
654 if let Some(instrument) = instrument {
655 self.add_instrument(instrument).unwrap();
656 } else {
657 panic!(
658 "No matching engine found for instrument {}",
659 bar.instrument_id()
660 );
661 }
662 }
663
664 if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
665 matching_engine.process_bar(&bar);
666 } else {
667 panic!("Matching engine should be initialized");
668 }
669 }
670
671 pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
675 if !self.matching_engines.contains_key(&status.instrument_id) {
678 let instrument = {
679 let cache = self.cache.as_ref().borrow();
680 cache.instrument(&status.instrument_id).cloned()
681 };
682
683 if let Some(instrument) = instrument {
684 self.add_instrument(instrument).unwrap();
685 } else {
686 panic!(
687 "No matching engine found for instrument {}",
688 status.instrument_id
689 );
690 }
691 }
692
693 if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
694 matching_engine.process_status(status.action);
695 } else {
696 panic!("Matching engine should be initialized");
697 }
698 }
699
700 pub fn process(&mut self, ts_now: UnixNanos) {
704 while let Some(inflight) = self.inflight_queue.peek() {
708 if inflight.timestamp > ts_now {
709 break;
711 }
712 let inflight = self.inflight_queue.pop().unwrap();
714 self.process_trading_command(inflight.command);
715 }
716
717 while let Some(command) = self.message_queue.pop_front() {
719 self.process_trading_command(command);
720 }
721 }
722
723 pub fn reset(&mut self) {
724 for module in &self.modules {
725 module.reset();
726 }
727
728 self.generate_fresh_account_state();
729
730 for matching_engine in self.matching_engines.values_mut() {
731 matching_engine.reset();
732 }
733
734 log::info!("Resetting exchange state");
736 }
737
738 pub fn process_trading_command(&mut self, command: TradingCommand) {
742 if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
743 let account_id = if let Some(exec_client) = &self.exec_client {
744 exec_client.account_id()
745 } else {
746 panic!("Execution client should be initialized");
747 };
748 match command {
749 TradingCommand::SubmitOrder(mut command) => {
750 matching_engine.process_order(&mut command.order, account_id);
751 }
752 TradingCommand::ModifyOrder(ref command) => {
753 matching_engine.process_modify(command, account_id);
754 }
755 TradingCommand::CancelOrder(ref command) => {
756 matching_engine.process_cancel(command, account_id);
757 }
758 TradingCommand::CancelAllOrders(ref command) => {
759 matching_engine.process_cancel_all(command, account_id);
760 }
761 TradingCommand::BatchCancelOrders(ref command) => {
762 matching_engine.process_batch_cancel(command, account_id);
763 }
764 TradingCommand::SubmitOrderList(mut command) => {
765 for order in &mut command.order_list.orders {
766 matching_engine.process_order(order, account_id);
767 }
768 }
769 _ => {}
770 }
771 } else {
772 panic!(
773 "Matching engine not found for instrument {}",
774 command.instrument_id()
775 );
776 }
777 }
778
779 pub fn generate_fresh_account_state(&self) {
783 let balances: Vec<AccountBalance> = self
784 .starting_balances
785 .iter()
786 .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
787 .collect();
788
789 if let Some(exec_client) = &self.exec_client {
790 exec_client
791 .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
792 .unwrap();
793 }
794
795 if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
797 margin_account.set_default_leverage(self.default_leverage);
798
799 for (instrument_id, leverage) in &self.leverages {
801 margin_account.set_leverage(*instrument_id, *leverage);
802 }
803 }
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use std::{cell::RefCell, collections::BinaryHeap, rc::Rc};
810
811 use ahash::AHashMap;
812 use nautilus_common::{
813 cache::Cache,
814 clock::TestClock,
815 messages::execution::{SubmitOrder, TradingCommand},
816 msgbus::{
817 self,
818 stubs::{get_message_saving_handler, get_saved_messages},
819 },
820 };
821 use nautilus_core::{UUID4, UnixNanos};
822 use nautilus_execution::models::{
823 fee::{FeeModelAny, MakerTakerFeeModel},
824 fill::FillModel,
825 latency::StaticLatencyModel,
826 };
827 use nautilus_model::{
828 accounts::{AccountAny, MarginAccount},
829 data::{
830 Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
831 TradeTick,
832 },
833 enums::{
834 AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
835 OmsType, OrderSide, OrderType,
836 },
837 events::AccountState,
838 identifiers::{AccountId, InstrumentId, StrategyId, TradeId, TraderId, Venue},
839 instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
840 orders::OrderTestBuilder,
841 stubs::TestDefault,
842 types::{AccountBalance, Currency, Money, Price, Quantity},
843 };
844 use rstest::rstest;
845
846 use crate::{
847 exchange::{InflightCommand, SimulatedExchange},
848 execution_client::BacktestExecutionClient,
849 };
850
851 fn get_exchange(
852 venue: Venue,
853 account_type: AccountType,
854 book_type: BookType,
855 cache: Option<Rc<RefCell<Cache>>>,
856 ) -> Rc<RefCell<SimulatedExchange>> {
857 let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
858 let clock = Rc::new(RefCell::new(TestClock::new()));
859 let exchange = Rc::new(RefCell::new(
860 SimulatedExchange::new(
861 venue,
862 OmsType::Netting,
863 account_type,
864 vec![Money::new(1000.0, Currency::USD())],
865 None,
866 1.into(),
867 AHashMap::new(),
868 vec![],
869 cache.clone(),
870 clock,
871 FillModel::default(),
872 FeeModelAny::MakerTaker(MakerTakerFeeModel),
873 book_type,
874 None, None, None, None, None, None, None, None, None, None, None, None, None, None, )
889 .unwrap(),
890 ));
891
892 let clock = TestClock::new();
893 let execution_client = BacktestExecutionClient::new(
894 TraderId::test_default(),
895 AccountId::test_default(),
896 exchange.clone(),
897 cache,
898 Rc::new(RefCell::new(clock)),
899 None,
900 None,
901 );
902 exchange
903 .borrow_mut()
904 .register_client(Rc::new(execution_client));
905
906 exchange
907 }
908
909 fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
910 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
911 let order = OrderTestBuilder::new(OrderType::Market)
912 .instrument_id(instrument_id)
913 .quantity(Quantity::from(1))
914 .build();
915 TradingCommand::SubmitOrder(SubmitOrder::new(
916 TraderId::test_default(),
917 None,
918 StrategyId::test_default(),
919 instrument_id,
920 order,
921 None,
922 None,
923 None, UUID4::default(),
925 ts_init,
926 ))
927 }
928
929 #[rstest]
930 #[should_panic(
931 expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
932 )]
933 fn test_venue_mismatch_between_exchange_and_instrument(
934 crypto_perpetual_ethusdt: CryptoPerpetual,
935 ) {
936 let exchange = get_exchange(
937 Venue::new("SIM"),
938 AccountType::Margin,
939 BookType::L1_MBP,
940 None,
941 );
942 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
943 exchange.borrow_mut().add_instrument(instrument).unwrap();
944 }
945
946 #[rstest]
947 #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
948 fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
949 let exchange = get_exchange(
950 Venue::new("BINANCE"),
951 AccountType::Cash,
952 BookType::L1_MBP,
953 None,
954 );
955 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
956 exchange.borrow_mut().add_instrument(instrument).unwrap();
957 }
958
959 #[rstest]
960 fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
961 let exchange = get_exchange(
962 Venue::new("BINANCE"),
963 AccountType::Margin,
964 BookType::L1_MBP,
965 None,
966 );
967 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
968
969 exchange.borrow_mut().add_instrument(instrument).unwrap();
971
972 let quote_tick = QuoteTick::new(
974 crypto_perpetual_ethusdt.id,
975 Price::from("1000.00"),
976 Price::from("1001.00"),
977 Quantity::from("1.000"),
978 Quantity::from("1.000"),
979 UnixNanos::default(),
980 UnixNanos::default(),
981 );
982 exchange.borrow_mut().process_quote_tick("e_tick);
983
984 let best_bid_price = exchange
985 .borrow()
986 .best_bid_price(crypto_perpetual_ethusdt.id);
987 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
988 let best_ask_price = exchange
989 .borrow()
990 .best_ask_price(crypto_perpetual_ethusdt.id);
991 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
992 }
993
994 #[rstest]
995 fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
996 let exchange = get_exchange(
997 Venue::new("BINANCE"),
998 AccountType::Margin,
999 BookType::L1_MBP,
1000 None,
1001 );
1002 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1003
1004 exchange.borrow_mut().add_instrument(instrument).unwrap();
1006
1007 let trade_tick = TradeTick::new(
1009 crypto_perpetual_ethusdt.id,
1010 Price::from("1000.00"),
1011 Quantity::from("1.000"),
1012 AggressorSide::Buyer,
1013 TradeId::from("1"),
1014 UnixNanos::default(),
1015 UnixNanos::default(),
1016 );
1017 exchange.borrow_mut().process_trade_tick(&trade_tick);
1018
1019 let best_bid_price = exchange
1020 .borrow()
1021 .best_bid_price(crypto_perpetual_ethusdt.id);
1022 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1023 let best_ask = exchange
1024 .borrow()
1025 .best_ask_price(crypto_perpetual_ethusdt.id);
1026 assert_eq!(best_ask, Some(Price::from("1000.00")));
1027 }
1028
1029 #[rstest]
1030 fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1031 let exchange = get_exchange(
1032 Venue::new("BINANCE"),
1033 AccountType::Margin,
1034 BookType::L1_MBP,
1035 None,
1036 );
1037 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1038
1039 exchange.borrow_mut().add_instrument(instrument).unwrap();
1041
1042 let bar = Bar::new(
1044 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1045 Price::from("1500.00"),
1046 Price::from("1505.00"),
1047 Price::from("1490.00"),
1048 Price::from("1502.00"),
1049 Quantity::from("100.000"),
1050 UnixNanos::default(),
1051 UnixNanos::default(),
1052 );
1053 exchange.borrow_mut().process_bar(bar);
1054
1055 let best_bid_price = exchange
1057 .borrow()
1058 .best_bid_price(crypto_perpetual_ethusdt.id);
1059 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1060 let best_ask_price = exchange
1061 .borrow()
1062 .best_ask_price(crypto_perpetual_ethusdt.id);
1063 assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1064 }
1065
1066 #[rstest]
1067 fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1068 let exchange = get_exchange(
1069 Venue::new("BINANCE"),
1070 AccountType::Margin,
1071 BookType::L1_MBP,
1072 None,
1073 );
1074 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1075
1076 exchange.borrow_mut().add_instrument(instrument).unwrap();
1078
1079 let bar_bid = Bar::new(
1082 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1083 Price::from("1500.00"),
1084 Price::from("1505.00"),
1085 Price::from("1490.00"),
1086 Price::from("1502.00"),
1087 Quantity::from("100.000"),
1088 UnixNanos::from(1),
1089 UnixNanos::from(1),
1090 );
1091 let bar_ask = Bar::new(
1092 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1093 Price::from("1501.00"),
1094 Price::from("1506.00"),
1095 Price::from("1491.00"),
1096 Price::from("1503.00"),
1097 Quantity::from("100.000"),
1098 UnixNanos::from(1),
1099 UnixNanos::from(1),
1100 );
1101
1102 exchange.borrow_mut().process_bar(bar_bid);
1104 exchange.borrow_mut().process_bar(bar_ask);
1105
1106 let best_bid_price = exchange
1108 .borrow()
1109 .best_bid_price(crypto_perpetual_ethusdt.id);
1110 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1111 let best_ask_price = exchange
1112 .borrow()
1113 .best_ask_price(crypto_perpetual_ethusdt.id);
1114 assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1115 }
1116
1117 #[rstest]
1118 fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1119 let exchange = get_exchange(
1120 Venue::new("BINANCE"),
1121 AccountType::Margin,
1122 BookType::L2_MBP,
1123 None,
1124 );
1125 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1126
1127 exchange.borrow_mut().add_instrument(instrument).unwrap();
1129
1130 let delta_buy = OrderBookDelta::new(
1132 crypto_perpetual_ethusdt.id,
1133 BookAction::Add,
1134 BookOrder::new(
1135 OrderSide::Buy,
1136 Price::from("1000.00"),
1137 Quantity::from("1.000"),
1138 1,
1139 ),
1140 0,
1141 0,
1142 UnixNanos::from(1),
1143 UnixNanos::from(1),
1144 );
1145 let delta_sell = OrderBookDelta::new(
1146 crypto_perpetual_ethusdt.id,
1147 BookAction::Add,
1148 BookOrder::new(
1149 OrderSide::Sell,
1150 Price::from("1001.00"),
1151 Quantity::from("1.000"),
1152 1,
1153 ),
1154 0,
1155 1,
1156 UnixNanos::from(2),
1157 UnixNanos::from(2),
1158 );
1159
1160 exchange.borrow_mut().process_order_book_delta(delta_buy);
1162 exchange.borrow_mut().process_order_book_delta(delta_sell);
1163
1164 let book = exchange
1165 .borrow()
1166 .get_book(crypto_perpetual_ethusdt.id)
1167 .unwrap()
1168 .clone();
1169 assert_eq!(book.update_count, 2);
1170 assert_eq!(book.sequence, 1);
1171 assert_eq!(book.ts_last, UnixNanos::from(2));
1172 let best_bid_price = exchange
1173 .borrow()
1174 .best_bid_price(crypto_perpetual_ethusdt.id);
1175 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1176 let best_ask_price = exchange
1177 .borrow()
1178 .best_ask_price(crypto_perpetual_ethusdt.id);
1179 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1180 }
1181
1182 #[rstest]
1183 fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1184 let exchange = get_exchange(
1185 Venue::new("BINANCE"),
1186 AccountType::Margin,
1187 BookType::L2_MBP,
1188 None,
1189 );
1190 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1191
1192 exchange.borrow_mut().add_instrument(instrument).unwrap();
1194
1195 let delta_sell_1 = OrderBookDelta::new(
1197 crypto_perpetual_ethusdt.id,
1198 BookAction::Add,
1199 BookOrder::new(
1200 OrderSide::Sell,
1201 Price::from("1000.00"),
1202 Quantity::from("3.000"),
1203 1,
1204 ),
1205 0,
1206 0,
1207 UnixNanos::from(1),
1208 UnixNanos::from(1),
1209 );
1210 let delta_sell_2 = OrderBookDelta::new(
1211 crypto_perpetual_ethusdt.id,
1212 BookAction::Add,
1213 BookOrder::new(
1214 OrderSide::Sell,
1215 Price::from("1001.00"),
1216 Quantity::from("1.000"),
1217 1,
1218 ),
1219 0,
1220 1,
1221 UnixNanos::from(1),
1222 UnixNanos::from(1),
1223 );
1224 let orderbook_deltas = OrderBookDeltas::new(
1225 crypto_perpetual_ethusdt.id,
1226 vec![delta_sell_1, delta_sell_2],
1227 );
1228
1229 exchange
1231 .borrow_mut()
1232 .process_order_book_deltas(orderbook_deltas);
1233
1234 let book = exchange
1235 .borrow()
1236 .get_book(crypto_perpetual_ethusdt.id)
1237 .unwrap()
1238 .clone();
1239 assert_eq!(book.update_count, 2);
1240 assert_eq!(book.sequence, 1);
1241 assert_eq!(book.ts_last, UnixNanos::from(1));
1242 let best_bid_price = exchange
1243 .borrow()
1244 .best_bid_price(crypto_perpetual_ethusdt.id);
1245 assert_eq!(best_bid_price, None);
1247 let best_ask_price = exchange
1248 .borrow()
1249 .best_ask_price(crypto_perpetual_ethusdt.id);
1250 assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1252 }
1253
1254 #[rstest]
1255 fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1256 let exchange = get_exchange(
1257 Venue::new("BINANCE"),
1258 AccountType::Margin,
1259 BookType::L2_MBP,
1260 None,
1261 );
1262 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1263
1264 exchange.borrow_mut().add_instrument(instrument).unwrap();
1266
1267 let instrument_status = InstrumentStatus::new(
1268 crypto_perpetual_ethusdt.id,
1269 MarketStatusAction::Close, UnixNanos::from(1),
1271 UnixNanos::from(1),
1272 None,
1273 None,
1274 None,
1275 None,
1276 None,
1277 );
1278
1279 exchange
1280 .borrow_mut()
1281 .process_instrument_status(instrument_status);
1282
1283 let market_status = exchange
1284 .borrow()
1285 .get_matching_engine(&crypto_perpetual_ethusdt.id)
1286 .unwrap()
1287 .market_status;
1288 assert_eq!(market_status, MarketStatus::Closed);
1289 }
1290
1291 #[rstest]
1292 fn test_accounting() {
1293 let account_type = AccountType::Margin;
1294 let mut cache = Cache::default();
1295 let handler = get_message_saving_handler::<AccountState>(None);
1296 msgbus::register("Portfolio.update_account".into(), handler.clone());
1297 let margin_account = MarginAccount::new(
1298 AccountState::new(
1299 AccountId::from("SIM-001"),
1300 account_type,
1301 vec![AccountBalance::new(
1302 Money::from("1000 USD"),
1303 Money::from("0 USD"),
1304 Money::from("1000 USD"),
1305 )],
1306 vec![],
1307 false,
1308 UUID4::default(),
1309 UnixNanos::default(),
1310 UnixNanos::default(),
1311 None,
1312 ),
1313 false,
1314 );
1315 let () = cache
1316 .add_account(AccountAny::Margin(margin_account))
1317 .unwrap();
1318 cache.build_index();
1320
1321 let exchange = get_exchange(
1322 Venue::new("SIM"),
1323 account_type,
1324 BookType::L2_MBP,
1325 Some(Rc::new(RefCell::new(cache))),
1326 );
1327 exchange.borrow_mut().initialize_account();
1328
1329 exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1331
1332 let messages = get_saved_messages::<AccountState>(handler);
1334 assert_eq!(messages.len(), 2);
1335 let account_state_first = messages.first().unwrap();
1336 let account_state_second = messages.last().unwrap();
1337
1338 assert_eq!(account_state_first.balances.len(), 1);
1339 let current_balance = account_state_first.balances[0];
1340 assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1341 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1342 assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1343
1344 assert_eq!(account_state_second.balances.len(), 1);
1345 let current_balance = account_state_second.balances[0];
1346 assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1347 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1348 assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1349 }
1350
1351 #[rstest]
1352 fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1353 let inflight1 = InflightCommand::new(
1355 UnixNanos::from(100),
1356 1,
1357 create_submit_order_command(UnixNanos::from(100)),
1358 );
1359 let inflight2 = InflightCommand::new(
1360 UnixNanos::from(200),
1361 2,
1362 create_submit_order_command(UnixNanos::from(200)),
1363 );
1364 let inflight3 = InflightCommand::new(
1365 UnixNanos::from(100),
1366 2,
1367 create_submit_order_command(UnixNanos::from(100)),
1368 );
1369
1370 let mut inflight_heap = BinaryHeap::new();
1372 inflight_heap.push(inflight1);
1373 inflight_heap.push(inflight2);
1374 inflight_heap.push(inflight3);
1375
1376 let first = inflight_heap.pop().unwrap();
1379 let second = inflight_heap.pop().unwrap();
1380 let third = inflight_heap.pop().unwrap();
1381
1382 assert_eq!(first.timestamp, UnixNanos::from(100));
1383 assert_eq!(first.counter, 1);
1384 assert_eq!(second.timestamp, UnixNanos::from(100));
1385 assert_eq!(second.counter, 2);
1386 assert_eq!(third.timestamp, UnixNanos::from(200));
1387 assert_eq!(third.counter, 2);
1388 }
1389
1390 #[rstest]
1391 fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1392 let exchange = get_exchange(
1393 Venue::new("BINANCE"),
1394 AccountType::Margin,
1395 BookType::L2_MBP,
1396 None,
1397 );
1398
1399 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1400 exchange.borrow_mut().add_instrument(instrument).unwrap();
1401
1402 let command1 = create_submit_order_command(UnixNanos::from(100));
1403 let command2 = create_submit_order_command(UnixNanos::from(200));
1404
1405 exchange.borrow_mut().send(command1);
1406 exchange.borrow_mut().send(command2);
1407
1408 assert_eq!(exchange.borrow().message_queue.len(), 2);
1411 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1412
1413 exchange.borrow_mut().process(UnixNanos::from(300));
1415 assert_eq!(exchange.borrow().message_queue.len(), 0);
1416 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1417 }
1418
1419 #[rstest]
1420 fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1421 let latency_model = StaticLatencyModel::new(
1424 UnixNanos::from(100),
1425 UnixNanos::from(200),
1426 UnixNanos::from(300),
1427 UnixNanos::from(100),
1428 );
1429 let exchange = get_exchange(
1430 Venue::new("BINANCE"),
1431 AccountType::Margin,
1432 BookType::L2_MBP,
1433 None,
1434 );
1435 exchange
1436 .borrow_mut()
1437 .set_latency_model(Box::new(latency_model));
1438
1439 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1440 exchange.borrow_mut().add_instrument(instrument).unwrap();
1441
1442 let command1 = create_submit_order_command(UnixNanos::from(100));
1443 let command2 = create_submit_order_command(UnixNanos::from(150));
1444 exchange.borrow_mut().send(command1);
1445 exchange.borrow_mut().send(command2);
1446
1447 assert_eq!(exchange.borrow().message_queue.len(), 0);
1449 assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1450 assert_eq!(
1452 exchange
1453 .borrow()
1454 .inflight_queue
1455 .iter()
1456 .next()
1457 .unwrap()
1458 .timestamp,
1459 UnixNanos::from(400)
1460 );
1461 assert_eq!(
1463 exchange
1464 .borrow()
1465 .inflight_queue
1466 .iter()
1467 .nth(1)
1468 .unwrap()
1469 .timestamp,
1470 UnixNanos::from(450)
1471 );
1472
1473 exchange.borrow_mut().process(UnixNanos::from(420));
1475 assert_eq!(exchange.borrow().message_queue.len(), 0);
1476 assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1477 assert_eq!(
1478 exchange
1479 .borrow()
1480 .inflight_queue
1481 .iter()
1482 .next()
1483 .unwrap()
1484 .timestamp,
1485 UnixNanos::from(450)
1486 );
1487 }
1488}