1#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23 cell::RefCell,
24 collections::{BinaryHeap, HashMap, VecDeque},
25 fmt::Debug,
26 rc::Rc,
27};
28
29use nautilus_common::{cache::Cache, clock::Clock, messages::execution::TradingCommand};
30use nautilus_core::{
31 UnixNanos,
32 correctness::{FAILED, check_equal},
33};
34use nautilus_execution::{
35 client::ExecutionClient,
36 matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
37 models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
38};
39use nautilus_model::{
40 accounts::AccountAny,
41 data::{
42 Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
43 QuoteTick, TradeTick,
44 },
45 enums::{AccountType, BookType, OmsType},
46 identifiers::{InstrumentId, Venue},
47 instruments::{Instrument, InstrumentAny},
48 orderbook::OrderBook,
49 orders::PassiveOrderAny,
50 types::{AccountBalance, Currency, Money, Price},
51};
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53
54use crate::modules::SimulationModule;
55
56#[derive(Debug, Eq, PartialEq)]
60struct InflightCommand {
61 ts: UnixNanos,
62 counter: u32,
63 command: TradingCommand,
64}
65
66impl InflightCommand {
67 const fn new(ts: UnixNanos, counter: u32, command: TradingCommand) -> Self {
68 Self {
69 ts,
70 counter,
71 command,
72 }
73 }
74}
75
76impl Ord for InflightCommand {
77 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78 other
80 .ts
81 .cmp(&self.ts)
82 .then_with(|| other.counter.cmp(&self.counter))
83 }
84}
85
86impl PartialOrd for InflightCommand {
87 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92pub struct SimulatedExchange {
108 pub id: Venue,
109 pub oms_type: OmsType,
110 pub account_type: AccountType,
111 starting_balances: Vec<Money>,
112 book_type: BookType,
113 default_leverage: Decimal,
114 exec_client: Option<Rc<dyn ExecutionClient>>,
115 pub base_currency: Option<Currency>,
116 fee_model: FeeModelAny,
117 fill_model: FillModel,
118 latency_model: Option<LatencyModel>,
119 instruments: HashMap<InstrumentId, InstrumentAny>,
120 matching_engines: HashMap<InstrumentId, OrderMatchingEngine>,
121 leverages: HashMap<InstrumentId, Decimal>,
122 modules: Vec<Box<dyn SimulationModule>>,
123 clock: Rc<RefCell<dyn Clock>>,
124 cache: Rc<RefCell<Cache>>,
125 message_queue: VecDeque<TradingCommand>,
126 inflight_queue: BinaryHeap<InflightCommand>,
127 inflight_counter: HashMap<UnixNanos, u32>,
128 bar_execution: bool,
129 reject_stop_orders: bool,
130 support_gtd_orders: bool,
131 support_contingent_orders: bool,
132 use_position_ids: bool,
133 use_random_ids: bool,
134 use_reduce_only: bool,
135 use_message_queue: bool,
136 allow_cash_borrowing: bool,
137 frozen_account: bool,
138}
139
140impl Debug for SimulatedExchange {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct(stringify!(SimulatedExchange))
143 .field("id", &self.id)
144 .field("account_type", &self.account_type)
145 .finish()
146 }
147}
148
149impl SimulatedExchange {
150 #[allow(clippy::too_many_arguments)]
158 pub fn new(
159 venue: Venue,
160 oms_type: OmsType,
161 account_type: AccountType,
162 starting_balances: Vec<Money>,
163 base_currency: Option<Currency>,
164 default_leverage: Decimal,
165 leverages: HashMap<InstrumentId, Decimal>,
166 modules: Vec<Box<dyn SimulationModule>>,
167 cache: Rc<RefCell<Cache>>,
168 clock: Rc<RefCell<dyn Clock>>,
169 fill_model: FillModel,
170 fee_model: FeeModelAny,
171 book_type: BookType,
172 latency_model: Option<LatencyModel>,
173 bar_execution: Option<bool>,
174 reject_stop_orders: Option<bool>,
175 support_gtd_orders: Option<bool>,
176 support_contingent_orders: Option<bool>,
177 use_position_ids: Option<bool>,
178 use_random_ids: Option<bool>,
179 use_reduce_only: Option<bool>,
180 use_message_queue: Option<bool>,
181 allow_cash_borrowing: Option<bool>,
182 frozen_account: Option<bool>,
183 ) -> anyhow::Result<Self> {
184 if starting_balances.is_empty() {
185 anyhow::bail!("Starting balances must be provided")
186 }
187 if base_currency.is_some() && starting_balances.len() > 1 {
188 anyhow::bail!("single-currency account has multiple starting currencies")
189 }
190 Ok(Self {
192 id: venue,
193 oms_type,
194 account_type,
195 starting_balances,
196 book_type,
197 default_leverage,
198 exec_client: None,
199 base_currency,
200 fee_model,
201 fill_model,
202 latency_model,
203 instruments: HashMap::new(),
204 matching_engines: HashMap::new(),
205 leverages,
206 modules,
207 clock,
208 cache,
209 message_queue: VecDeque::new(),
210 inflight_queue: BinaryHeap::new(),
211 inflight_counter: HashMap::new(),
212 bar_execution: bar_execution.unwrap_or(true),
213 reject_stop_orders: reject_stop_orders.unwrap_or(true),
214 support_gtd_orders: support_gtd_orders.unwrap_or(true),
215 support_contingent_orders: support_contingent_orders.unwrap_or(true),
216 use_position_ids: use_position_ids.unwrap_or(true),
217 use_random_ids: use_random_ids.unwrap_or(false),
218 use_reduce_only: use_reduce_only.unwrap_or(true),
219 use_message_queue: use_message_queue.unwrap_or(true),
220 allow_cash_borrowing: allow_cash_borrowing.unwrap_or(false),
221 frozen_account: frozen_account.unwrap_or(false),
222 })
223 }
224
225 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
226 self.exec_client = Some(client);
227 }
228
229 pub fn set_fill_model(&mut self, fill_model: FillModel) {
230 for matching_engine in self.matching_engines.values_mut() {
231 matching_engine.set_fill_model(fill_model.clone());
232 log::info!(
233 "Setting fill model for {} to {}",
234 matching_engine.venue,
235 self.fill_model
236 );
237 }
238 self.fill_model = fill_model;
239 }
240
241 pub const fn set_latency_model(&mut self, latency_model: LatencyModel) {
242 self.latency_model = Some(latency_model);
243 }
244
245 pub fn initialize_account(&mut self) {
246 self.generate_fresh_account_state();
247 }
248
249 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
259 check_equal(
260 &instrument.id().venue,
261 &self.id,
262 "Venue of instrument id",
263 "Venue of simulated exchange",
264 )
265 .expect(FAILED);
266
267 if self.account_type == AccountType::Cash
268 && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
269 || matches!(instrument, InstrumentAny::CryptoFuture(_)))
270 {
271 anyhow::bail!("Cash account cannot trade futures or perpetuals")
272 }
273
274 self.instruments.insert(instrument.id(), instrument.clone());
275
276 let matching_engine_config = OrderMatchingEngineConfig::new(
277 self.bar_execution,
278 self.reject_stop_orders,
279 self.support_gtd_orders,
280 self.support_contingent_orders,
281 self.use_position_ids,
282 self.use_random_ids,
283 self.use_reduce_only,
284 );
285 let instrument_id = instrument.id();
286 let matching_engine = OrderMatchingEngine::new(
287 instrument,
288 self.instruments.len() as u32,
289 self.fill_model.clone(),
290 self.fee_model.clone(),
291 self.book_type,
292 self.oms_type,
293 self.account_type,
294 self.clock.clone(),
295 Rc::clone(&self.cache),
296 matching_engine_config,
297 );
298 self.matching_engines.insert(instrument_id, matching_engine);
299
300 log::info!("Added instrument {instrument_id} and created matching engine");
301 Ok(())
302 }
303
304 #[must_use]
305 pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
306 self.matching_engines
307 .get(&instrument_id)
308 .and_then(OrderMatchingEngine::best_bid_price)
309 }
310
311 #[must_use]
312 pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
313 self.matching_engines
314 .get(&instrument_id)
315 .and_then(OrderMatchingEngine::best_ask_price)
316 }
317
318 pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
319 self.matching_engines
320 .get(&instrument_id)
321 .map(OrderMatchingEngine::get_book)
322 }
323
324 #[must_use]
325 pub fn get_matching_engine(
326 &self,
327 instrument_id: &InstrumentId,
328 ) -> Option<&OrderMatchingEngine> {
329 self.matching_engines.get(instrument_id)
330 }
331
332 #[must_use]
333 pub const fn get_matching_engines(&self) -> &HashMap<InstrumentId, OrderMatchingEngine> {
334 &self.matching_engines
335 }
336
337 #[must_use]
338 pub fn get_books(&self) -> HashMap<InstrumentId, OrderBook> {
339 let mut books = HashMap::new();
340 for (instrument_id, matching_engine) in &self.matching_engines {
341 books.insert(*instrument_id, matching_engine.get_book().clone());
342 }
343 books
344 }
345
346 #[must_use]
347 pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
348 instrument_id
349 .and_then(|id| {
350 self.matching_engines
351 .get(&id)
352 .map(OrderMatchingEngine::get_open_orders)
353 })
354 .unwrap_or_else(|| {
355 self.matching_engines
356 .values()
357 .flat_map(OrderMatchingEngine::get_open_orders)
358 .collect()
359 })
360 }
361
362 #[must_use]
363 pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
364 instrument_id
365 .and_then(|id| {
366 self.matching_engines
367 .get(&id)
368 .map(|engine| engine.get_open_bid_orders().to_vec())
369 })
370 .unwrap_or_else(|| {
371 self.matching_engines
372 .values()
373 .flat_map(|engine| engine.get_open_bid_orders().to_vec())
374 .collect()
375 })
376 }
377
378 #[must_use]
379 pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
380 instrument_id
381 .and_then(|id| {
382 self.matching_engines
383 .get(&id)
384 .map(|engine| engine.get_open_ask_orders().to_vec())
385 })
386 .unwrap_or_else(|| {
387 self.matching_engines
388 .values()
389 .flat_map(|engine| engine.get_open_ask_orders().to_vec())
390 .collect()
391 })
392 }
393
394 #[must_use]
398 pub fn get_account(&self) -> Option<AccountAny> {
399 self.exec_client
400 .as_ref()
401 .map(|client| client.get_account().unwrap())
402 }
403
404 pub fn adjust_account(&mut self, adjustment: Money) {
408 if self.frozen_account {
409 return;
411 }
412
413 if let Some(exec_client) = &self.exec_client {
414 let venue = exec_client.venue();
415 println!("Adjusting account for venue {venue}");
416 if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
417 match account.balance(Some(adjustment.currency)) {
418 Some(balance) => {
419 let mut current_balance = *balance;
420 current_balance.total += adjustment;
421 current_balance.free += adjustment;
422
423 let margins = match account {
424 AccountAny::Margin(margin_account) => margin_account.margins.clone(),
425 _ => HashMap::new(),
426 };
427
428 if let Some(exec_client) = &self.exec_client {
429 exec_client
430 .generate_account_state(
431 vec![current_balance],
432 margins.values().copied().collect(),
433 true,
434 self.clock.borrow().timestamp_ns(),
435 )
436 .unwrap();
437 }
438 }
439 None => {
440 log::error!(
441 "Cannot adjust account: no balance for currency {}",
442 adjustment.currency
443 );
444 }
445 }
446 } else {
447 log::error!("Cannot adjust account: no account for venue {venue}");
448 }
449 }
450 }
451
452 pub fn send(&mut self, command: TradingCommand) {
453 if !self.use_message_queue {
454 self.process_trading_command(command);
455 } else if self.latency_model.is_none() {
456 self.message_queue.push_back(command);
457 } else {
458 let (ts, counter) = self.generate_inflight_command(&command);
459 self.inflight_queue
460 .push(InflightCommand::new(ts, counter, command));
461 }
462 }
463
464 pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
468 if let Some(latency_model) = &self.latency_model {
469 let ts = match command {
470 TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
471 command.ts_init() + latency_model.insert_latency_nanos
472 }
473 TradingCommand::ModifyOrder(_) => {
474 command.ts_init() + latency_model.update_latency_nanos
475 }
476 TradingCommand::CancelOrder(_)
477 | TradingCommand::CancelAllOrders(_)
478 | TradingCommand::BatchCancelOrders(_) => {
479 command.ts_init() + latency_model.delete_latency_nanos
480 }
481 _ => panic!("Cannot handle command: {command:?}"),
482 };
483
484 let counter = self
485 .inflight_counter
486 .entry(ts)
487 .and_modify(|e| *e += 1)
488 .or_insert(1);
489
490 (ts, *counter)
491 } else {
492 panic!("Latency model should be initialized");
493 }
494 }
495
496 pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
500 for module in &self.modules {
501 module.pre_process(Data::Delta(delta));
502 }
503
504 if !self.matching_engines.contains_key(&delta.instrument_id) {
505 let instrument = {
506 let cache = self.cache.as_ref().borrow();
507 cache.instrument(&delta.instrument_id).cloned()
508 };
509
510 if let Some(instrument) = instrument {
511 self.add_instrument(instrument).unwrap();
512 } else {
513 panic!(
514 "No matching engine found for instrument {}",
515 delta.instrument_id
516 );
517 }
518 }
519
520 if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
521 matching_engine.process_order_book_delta(&delta);
522 } else {
523 panic!("Matching engine should be initialized");
524 }
525 }
526
527 pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
531 for module in &self.modules {
532 module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
533 }
534
535 if !self.matching_engines.contains_key(&deltas.instrument_id) {
536 let instrument = {
537 let cache = self.cache.as_ref().borrow();
538 cache.instrument(&deltas.instrument_id).cloned()
539 };
540
541 if let Some(instrument) = instrument {
542 self.add_instrument(instrument).unwrap();
543 } else {
544 panic!(
545 "No matching engine found for instrument {}",
546 deltas.instrument_id
547 );
548 }
549 }
550
551 if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
552 matching_engine.process_order_book_deltas(&deltas);
553 } else {
554 panic!("Matching engine should be initialized");
555 }
556 }
557
558 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
562 for module in &self.modules {
563 module.pre_process(Data::Quote(quote.to_owned()));
564 }
565
566 if !self.matching_engines.contains_key("e.instrument_id) {
567 let instrument = {
568 let cache = self.cache.as_ref().borrow();
569 cache.instrument("e.instrument_id).cloned()
570 };
571
572 if let Some(instrument) = instrument {
573 self.add_instrument(instrument).unwrap();
574 } else {
575 panic!(
576 "No matching engine found for instrument {}",
577 quote.instrument_id
578 );
579 }
580 }
581
582 if let Some(matching_engine) = self.matching_engines.get_mut("e.instrument_id) {
583 matching_engine.process_quote_tick(quote);
584 } else {
585 panic!("Matching engine should be initialized");
586 }
587 }
588
589 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
593 for module in &self.modules {
594 module.pre_process(Data::Trade(trade.to_owned()));
595 }
596
597 if !self.matching_engines.contains_key(&trade.instrument_id) {
598 let instrument = {
599 let cache = self.cache.as_ref().borrow();
600 cache.instrument(&trade.instrument_id).cloned()
601 };
602
603 if let Some(instrument) = instrument {
604 self.add_instrument(instrument).unwrap();
605 } else {
606 panic!(
607 "No matching engine found for instrument {}",
608 trade.instrument_id
609 );
610 }
611 }
612
613 if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
614 matching_engine.process_trade_tick(trade);
615 } else {
616 panic!("Matching engine should be initialized");
617 }
618 }
619
620 pub fn process_bar(&mut self, bar: Bar) {
624 for module in &self.modules {
625 module.pre_process(Data::Bar(bar));
626 }
627
628 if !self.matching_engines.contains_key(&bar.instrument_id()) {
629 let instrument = {
630 let cache = self.cache.as_ref().borrow();
631 cache.instrument(&bar.instrument_id()).cloned()
632 };
633
634 if let Some(instrument) = instrument {
635 self.add_instrument(instrument).unwrap();
636 } else {
637 panic!(
638 "No matching engine found for instrument {}",
639 bar.instrument_id()
640 );
641 }
642 }
643
644 if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
645 matching_engine.process_bar(&bar);
646 } else {
647 panic!("Matching engine should be initialized");
648 }
649 }
650
651 pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
655 if !self.matching_engines.contains_key(&status.instrument_id) {
658 let instrument = {
659 let cache = self.cache.as_ref().borrow();
660 cache.instrument(&status.instrument_id).cloned()
661 };
662
663 if let Some(instrument) = instrument {
664 self.add_instrument(instrument).unwrap();
665 } else {
666 panic!(
667 "No matching engine found for instrument {}",
668 status.instrument_id
669 );
670 }
671 }
672
673 if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
674 matching_engine.process_status(status.action);
675 } else {
676 panic!("Matching engine should be initialized");
677 }
678 }
679
680 pub fn process(&mut self, ts_now: UnixNanos) {
684 while let Some(inflight) = self.inflight_queue.peek() {
688 if inflight.ts > ts_now {
689 break;
691 }
692 let inflight = self.inflight_queue.pop().unwrap();
694 self.process_trading_command(inflight.command);
695 }
696
697 while let Some(command) = self.message_queue.pop_front() {
699 self.process_trading_command(command);
700 }
701 }
702
703 pub fn reset(&mut self) {
704 for module in &self.modules {
705 module.reset();
706 }
707
708 self.generate_fresh_account_state();
709
710 for matching_engine in self.matching_engines.values_mut() {
711 matching_engine.reset();
712 }
713
714 log::info!("Resetting exchange state");
716 }
717
718 pub fn process_trading_command(&mut self, command: TradingCommand) {
722 if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
723 let account_id = if let Some(exec_client) = &self.exec_client {
724 exec_client.account_id()
725 } else {
726 panic!("Execution client should be initialized");
727 };
728 match command {
729 TradingCommand::SubmitOrder(mut command) => {
730 matching_engine.process_order(&mut command.order, account_id);
731 }
732 TradingCommand::ModifyOrder(ref command) => {
733 matching_engine.process_modify(command, account_id);
734 }
735 TradingCommand::CancelOrder(ref command) => {
736 matching_engine.process_cancel(command, account_id);
737 }
738 TradingCommand::CancelAllOrders(ref command) => {
739 matching_engine.process_cancel_all(command, account_id);
740 }
741 TradingCommand::BatchCancelOrders(ref command) => {
742 matching_engine.process_batch_cancel(command, account_id);
743 }
744 TradingCommand::SubmitOrderList(mut command) => {
745 for order in &mut command.order_list.orders {
746 matching_engine.process_order(order, account_id);
747 }
748 }
749 _ => {}
750 }
751 } else {
752 panic!(
753 "Matching engine not found for instrument {}",
754 command.instrument_id()
755 );
756 }
757 }
758
759 pub fn generate_fresh_account_state(&self) {
763 let balances: Vec<AccountBalance> = self
764 .starting_balances
765 .iter()
766 .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
767 .collect();
768
769 if let Some(exec_client) = &self.exec_client {
770 exec_client
771 .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
772 .unwrap();
773 }
774
775 if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
777 margin_account.set_default_leverage(self.default_leverage.to_f64().unwrap());
778
779 for (instrument_id, leverage) in &self.leverages {
781 margin_account.set_leverage(*instrument_id, leverage.to_f64().unwrap());
782 }
783 }
784 }
785}
786
787#[cfg(test)]
792mod tests {
793 use std::{
794 cell::RefCell,
795 collections::{BinaryHeap, HashMap},
796 rc::Rc,
797 sync::LazyLock,
798 };
799
800 use nautilus_common::{
801 cache::Cache,
802 clock::TestClock,
803 messages::execution::{SubmitOrder, TradingCommand},
804 msgbus::{
805 self,
806 stubs::{get_message_saving_handler, get_saved_messages},
807 },
808 };
809 use nautilus_core::{AtomicTime, UUID4, UnixNanos};
810 use nautilus_execution::models::{
811 fee::{FeeModelAny, MakerTakerFeeModel},
812 fill::FillModel,
813 latency::LatencyModel,
814 };
815 use nautilus_model::{
816 accounts::{AccountAny, MarginAccount},
817 data::{
818 Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
819 TradeTick,
820 },
821 enums::{
822 AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
823 OmsType, OrderSide, OrderType,
824 },
825 events::AccountState,
826 identifiers::{
827 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
828 VenueOrderId,
829 },
830 instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
831 orders::OrderTestBuilder,
832 types::{AccountBalance, Currency, Money, Price, Quantity},
833 };
834 use rstest::rstest;
835
836 use crate::{
837 exchange::{InflightCommand, SimulatedExchange},
838 execution_client::BacktestExecutionClient,
839 };
840
841 static ATOMIC_TIME: LazyLock<AtomicTime> =
842 LazyLock::new(|| AtomicTime::new(true, UnixNanos::default()));
843
844 fn get_exchange(
845 venue: Venue,
846 account_type: AccountType,
847 book_type: BookType,
848 cache: Option<Rc<RefCell<Cache>>>,
849 ) -> Rc<RefCell<SimulatedExchange>> {
850 let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
851 let clock = Rc::new(RefCell::new(TestClock::new()));
852 let exchange = Rc::new(RefCell::new(
853 SimulatedExchange::new(
854 venue,
855 OmsType::Netting,
856 account_type,
857 vec![Money::new(1000.0, Currency::USD())],
858 None,
859 1.into(),
860 HashMap::new(),
861 vec![],
862 cache.clone(),
863 clock,
864 FillModel::default(),
865 FeeModelAny::MakerTaker(MakerTakerFeeModel),
866 book_type,
867 None,
868 None,
869 None,
870 None,
871 None,
872 None,
873 None,
874 None,
875 None,
876 None,
877 None,
878 )
879 .unwrap(),
880 ));
881
882 let clock = TestClock::new();
883 let execution_client = BacktestExecutionClient::new(
884 TraderId::default(),
885 AccountId::default(),
886 exchange.clone(),
887 cache,
888 Rc::new(RefCell::new(clock)),
889 None,
890 None,
891 );
892 exchange
893 .borrow_mut()
894 .register_client(Rc::new(execution_client));
895
896 exchange
897 }
898
899 fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
900 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
901 let order = OrderTestBuilder::new(OrderType::Market)
902 .instrument_id(instrument_id)
903 .quantity(Quantity::from(1))
904 .build();
905 TradingCommand::SubmitOrder(
906 SubmitOrder::new(
907 TraderId::default(),
908 ClientId::default(),
909 StrategyId::default(),
910 instrument_id,
911 ClientOrderId::default(),
912 VenueOrderId::default(),
913 order,
914 None,
915 None,
916 UUID4::default(),
917 ts_init,
918 )
919 .unwrap(),
920 )
921 }
922
923 #[rstest]
924 #[should_panic(
925 expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
926 )]
927 fn test_venue_mismatch_between_exchange_and_instrument(
928 crypto_perpetual_ethusdt: CryptoPerpetual,
929 ) {
930 let exchange = get_exchange(
931 Venue::new("SIM"),
932 AccountType::Margin,
933 BookType::L1_MBP,
934 None,
935 );
936 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
937 exchange.borrow_mut().add_instrument(instrument).unwrap();
938 }
939
940 #[rstest]
941 #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
942 fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
943 let exchange = get_exchange(
944 Venue::new("BINANCE"),
945 AccountType::Cash,
946 BookType::L1_MBP,
947 None,
948 );
949 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
950 exchange.borrow_mut().add_instrument(instrument).unwrap();
951 }
952
953 #[rstest]
954 fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
955 let exchange = get_exchange(
956 Venue::new("BINANCE"),
957 AccountType::Margin,
958 BookType::L1_MBP,
959 None,
960 );
961 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
962
963 exchange.borrow_mut().add_instrument(instrument).unwrap();
965
966 let quote_tick = QuoteTick::new(
968 crypto_perpetual_ethusdt.id,
969 Price::from("1000"),
970 Price::from("1001"),
971 Quantity::from(1),
972 Quantity::from(1),
973 UnixNanos::default(),
974 UnixNanos::default(),
975 );
976 exchange.borrow_mut().process_quote_tick("e_tick);
977
978 let best_bid_price = exchange
979 .borrow()
980 .best_bid_price(crypto_perpetual_ethusdt.id);
981 assert_eq!(best_bid_price, Some(Price::from("1000")));
982 let best_ask_price = exchange
983 .borrow()
984 .best_ask_price(crypto_perpetual_ethusdt.id);
985 assert_eq!(best_ask_price, Some(Price::from("1001")));
986 }
987
988 #[rstest]
989 fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
990 let exchange = get_exchange(
991 Venue::new("BINANCE"),
992 AccountType::Margin,
993 BookType::L1_MBP,
994 None,
995 );
996 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
997
998 exchange.borrow_mut().add_instrument(instrument).unwrap();
1000
1001 let trade_tick = TradeTick::new(
1003 crypto_perpetual_ethusdt.id,
1004 Price::from("1000"),
1005 Quantity::from(1),
1006 AggressorSide::Buyer,
1007 TradeId::from("1"),
1008 UnixNanos::default(),
1009 UnixNanos::default(),
1010 );
1011 exchange.borrow_mut().process_trade_tick(&trade_tick);
1012
1013 let best_bid_price = exchange
1014 .borrow()
1015 .best_bid_price(crypto_perpetual_ethusdt.id);
1016 assert_eq!(best_bid_price, Some(Price::from("1000")));
1017 let best_ask = exchange
1018 .borrow()
1019 .best_ask_price(crypto_perpetual_ethusdt.id);
1020 assert_eq!(best_ask, Some(Price::from("1000")));
1021 }
1022
1023 #[rstest]
1024 fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1025 let exchange = get_exchange(
1026 Venue::new("BINANCE"),
1027 AccountType::Margin,
1028 BookType::L1_MBP,
1029 None,
1030 );
1031 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1032
1033 exchange.borrow_mut().add_instrument(instrument).unwrap();
1035
1036 let bar = Bar::new(
1038 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1039 Price::from("1500.00"),
1040 Price::from("1505.00"),
1041 Price::from("1490.00"),
1042 Price::from("1502.00"),
1043 Quantity::from(100),
1044 UnixNanos::default(),
1045 UnixNanos::default(),
1046 );
1047 exchange.borrow_mut().process_bar(bar);
1048
1049 let best_bid_price = exchange
1051 .borrow()
1052 .best_bid_price(crypto_perpetual_ethusdt.id);
1053 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1054 let best_ask_price = exchange
1055 .borrow()
1056 .best_ask_price(crypto_perpetual_ethusdt.id);
1057 assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1058 }
1059
1060 #[rstest]
1061 fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1062 let exchange = get_exchange(
1063 Venue::new("BINANCE"),
1064 AccountType::Margin,
1065 BookType::L1_MBP,
1066 None,
1067 );
1068 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1069
1070 exchange.borrow_mut().add_instrument(instrument).unwrap();
1072
1073 let bar_bid = Bar::new(
1076 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1077 Price::from("1500.00"),
1078 Price::from("1505.00"),
1079 Price::from("1490.00"),
1080 Price::from("1502.00"),
1081 Quantity::from(100),
1082 UnixNanos::from(1),
1083 UnixNanos::from(1),
1084 );
1085 let bar_ask = Bar::new(
1086 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1087 Price::from("1501.00"),
1088 Price::from("1506.00"),
1089 Price::from("1491.00"),
1090 Price::from("1503.00"),
1091 Quantity::from(100),
1092 UnixNanos::from(1),
1093 UnixNanos::from(1),
1094 );
1095
1096 exchange.borrow_mut().process_bar(bar_bid);
1098 exchange.borrow_mut().process_bar(bar_ask);
1099
1100 let best_bid_price = exchange
1102 .borrow()
1103 .best_bid_price(crypto_perpetual_ethusdt.id);
1104 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1105 let best_ask_price = exchange
1106 .borrow()
1107 .best_ask_price(crypto_perpetual_ethusdt.id);
1108 assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1109 }
1110
1111 #[rstest]
1112 fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1113 let exchange = get_exchange(
1114 Venue::new("BINANCE"),
1115 AccountType::Margin,
1116 BookType::L2_MBP,
1117 None,
1118 );
1119 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1120
1121 exchange.borrow_mut().add_instrument(instrument).unwrap();
1123
1124 let delta_buy = OrderBookDelta::new(
1126 crypto_perpetual_ethusdt.id,
1127 BookAction::Add,
1128 BookOrder::new(OrderSide::Buy, Price::from("1000.00"), Quantity::from(1), 1),
1129 0,
1130 0,
1131 UnixNanos::from(1),
1132 UnixNanos::from(1),
1133 );
1134 let delta_sell = OrderBookDelta::new(
1135 crypto_perpetual_ethusdt.id,
1136 BookAction::Add,
1137 BookOrder::new(
1138 OrderSide::Sell,
1139 Price::from("1001.00"),
1140 Quantity::from(1),
1141 1,
1142 ),
1143 0,
1144 1,
1145 UnixNanos::from(2),
1146 UnixNanos::from(2),
1147 );
1148
1149 exchange.borrow_mut().process_order_book_delta(delta_buy);
1151 exchange.borrow_mut().process_order_book_delta(delta_sell);
1152
1153 let book = exchange
1154 .borrow()
1155 .get_book(crypto_perpetual_ethusdt.id)
1156 .unwrap()
1157 .clone();
1158 assert_eq!(book.update_count, 2);
1159 assert_eq!(book.sequence, 1);
1160 assert_eq!(book.ts_last, UnixNanos::from(2));
1161 let best_bid_price = exchange
1162 .borrow()
1163 .best_bid_price(crypto_perpetual_ethusdt.id);
1164 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1165 let best_ask_price = exchange
1166 .borrow()
1167 .best_ask_price(crypto_perpetual_ethusdt.id);
1168 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1169 }
1170
1171 #[rstest]
1172 fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1173 let exchange = get_exchange(
1174 Venue::new("BINANCE"),
1175 AccountType::Margin,
1176 BookType::L2_MBP,
1177 None,
1178 );
1179 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1180
1181 exchange.borrow_mut().add_instrument(instrument).unwrap();
1183
1184 let delta_sell_1 = OrderBookDelta::new(
1186 crypto_perpetual_ethusdt.id,
1187 BookAction::Add,
1188 BookOrder::new(
1189 OrderSide::Sell,
1190 Price::from("1000.00"),
1191 Quantity::from(3),
1192 1,
1193 ),
1194 0,
1195 0,
1196 UnixNanos::from(1),
1197 UnixNanos::from(1),
1198 );
1199 let delta_sell_2 = OrderBookDelta::new(
1200 crypto_perpetual_ethusdt.id,
1201 BookAction::Add,
1202 BookOrder::new(
1203 OrderSide::Sell,
1204 Price::from("1001.00"),
1205 Quantity::from(1),
1206 1,
1207 ),
1208 0,
1209 1,
1210 UnixNanos::from(1),
1211 UnixNanos::from(1),
1212 );
1213 let orderbook_deltas = OrderBookDeltas::new(
1214 crypto_perpetual_ethusdt.id,
1215 vec![delta_sell_1, delta_sell_2],
1216 );
1217
1218 exchange
1220 .borrow_mut()
1221 .process_order_book_deltas(orderbook_deltas);
1222
1223 let book = exchange
1224 .borrow()
1225 .get_book(crypto_perpetual_ethusdt.id)
1226 .unwrap()
1227 .clone();
1228 assert_eq!(book.update_count, 2);
1229 assert_eq!(book.sequence, 1);
1230 assert_eq!(book.ts_last, UnixNanos::from(1));
1231 let best_bid_price = exchange
1232 .borrow()
1233 .best_bid_price(crypto_perpetual_ethusdt.id);
1234 assert_eq!(best_bid_price, None);
1236 let best_ask_price = exchange
1237 .borrow()
1238 .best_ask_price(crypto_perpetual_ethusdt.id);
1239 assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1241 }
1242
1243 #[rstest]
1244 fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1245 let exchange = get_exchange(
1246 Venue::new("BINANCE"),
1247 AccountType::Margin,
1248 BookType::L2_MBP,
1249 None,
1250 );
1251 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1252
1253 exchange.borrow_mut().add_instrument(instrument).unwrap();
1255
1256 let instrument_status = InstrumentStatus::new(
1257 crypto_perpetual_ethusdt.id,
1258 MarketStatusAction::Close, UnixNanos::from(1),
1260 UnixNanos::from(1),
1261 None,
1262 None,
1263 None,
1264 None,
1265 None,
1266 );
1267
1268 exchange
1269 .borrow_mut()
1270 .process_instrument_status(instrument_status);
1271
1272 let market_status = exchange
1273 .borrow()
1274 .get_matching_engine(&crypto_perpetual_ethusdt.id)
1275 .unwrap()
1276 .market_status;
1277 assert_eq!(market_status, MarketStatus::Closed);
1278 }
1279
1280 #[rstest]
1281 fn test_accounting() {
1282 let account_type = AccountType::Margin;
1283 let mut cache = Cache::default();
1284 let handler = get_message_saving_handler::<AccountState>(None);
1285 msgbus::register("Portfolio.update_account".into(), handler.clone());
1286 let margin_account = MarginAccount::new(
1287 AccountState::new(
1288 AccountId::from("SIM-001"),
1289 account_type,
1290 vec![AccountBalance::new(
1291 Money::from("1000 USD"),
1292 Money::from("0 USD"),
1293 Money::from("1000 USD"),
1294 )],
1295 vec![],
1296 false,
1297 UUID4::default(),
1298 UnixNanos::default(),
1299 UnixNanos::default(),
1300 None,
1301 ),
1302 false,
1303 );
1304 let () = cache
1305 .add_account(AccountAny::Margin(margin_account))
1306 .unwrap();
1307 cache.build_index();
1309
1310 let exchange = get_exchange(
1311 Venue::new("SIM"),
1312 account_type,
1313 BookType::L2_MBP,
1314 Some(Rc::new(RefCell::new(cache))),
1315 );
1316 exchange.borrow_mut().initialize_account();
1317
1318 exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1320
1321 let messages = get_saved_messages::<AccountState>(handler);
1323 assert_eq!(messages.len(), 2);
1324 let account_state_first = messages.first().unwrap();
1325 let account_state_second = messages.last().unwrap();
1326
1327 assert_eq!(account_state_first.balances.len(), 1);
1328 let current_balance = account_state_first.balances[0];
1329 assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1330 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1331 assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1332
1333 assert_eq!(account_state_second.balances.len(), 1);
1334 let current_balance = account_state_second.balances[0];
1335 assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1336 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1337 assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1338 }
1339
1340 #[rstest]
1341 fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1342 let inflight1 = InflightCommand::new(
1344 UnixNanos::from(100),
1345 1,
1346 create_submit_order_command(UnixNanos::from(100)),
1347 );
1348 let inflight2 = InflightCommand::new(
1349 UnixNanos::from(200),
1350 2,
1351 create_submit_order_command(UnixNanos::from(200)),
1352 );
1353 let inflight3 = InflightCommand::new(
1354 UnixNanos::from(100),
1355 2,
1356 create_submit_order_command(UnixNanos::from(100)),
1357 );
1358
1359 let mut inflight_heap = BinaryHeap::new();
1361 inflight_heap.push(inflight1);
1362 inflight_heap.push(inflight2);
1363 inflight_heap.push(inflight3);
1364
1365 let first = inflight_heap.pop().unwrap();
1368 let second = inflight_heap.pop().unwrap();
1369 let third = inflight_heap.pop().unwrap();
1370
1371 assert_eq!(first.ts, UnixNanos::from(100));
1372 assert_eq!(first.counter, 1);
1373 assert_eq!(second.ts, UnixNanos::from(100));
1374 assert_eq!(second.counter, 2);
1375 assert_eq!(third.ts, UnixNanos::from(200));
1376 assert_eq!(third.counter, 2);
1377 }
1378
1379 #[rstest]
1380 fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1381 let exchange = get_exchange(
1382 Venue::new("BINANCE"),
1383 AccountType::Margin,
1384 BookType::L2_MBP,
1385 None,
1386 );
1387
1388 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1389 exchange.borrow_mut().add_instrument(instrument).unwrap();
1390
1391 let command1 = create_submit_order_command(UnixNanos::from(100));
1392 let command2 = create_submit_order_command(UnixNanos::from(200));
1393
1394 exchange.borrow_mut().send(command1);
1395 exchange.borrow_mut().send(command2);
1396
1397 assert_eq!(exchange.borrow().message_queue.len(), 2);
1400 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1401
1402 exchange.borrow_mut().process(UnixNanos::from(300));
1404 assert_eq!(exchange.borrow().message_queue.len(), 0);
1405 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1406 }
1407
1408 #[rstest]
1409 fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1410 let latency_model = LatencyModel::new(
1411 UnixNanos::from(100),
1412 UnixNanos::from(200),
1413 UnixNanos::from(300),
1414 UnixNanos::from(100),
1415 );
1416 let exchange = get_exchange(
1417 Venue::new("BINANCE"),
1418 AccountType::Margin,
1419 BookType::L2_MBP,
1420 None,
1421 );
1422 exchange.borrow_mut().set_latency_model(latency_model);
1423
1424 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1425 exchange.borrow_mut().add_instrument(instrument).unwrap();
1426
1427 let command1 = create_submit_order_command(UnixNanos::from(100));
1428 let command2 = create_submit_order_command(UnixNanos::from(150));
1429 exchange.borrow_mut().send(command1);
1430 exchange.borrow_mut().send(command2);
1431
1432 assert_eq!(exchange.borrow().message_queue.len(), 0);
1434 assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1435 assert_eq!(
1437 exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1438 UnixNanos::from(300)
1439 );
1440 assert_eq!(
1442 exchange.borrow().inflight_queue.iter().nth(1).unwrap().ts,
1443 UnixNanos::from(350)
1444 );
1445
1446 exchange.borrow_mut().process(UnixNanos::from(320));
1448 assert_eq!(exchange.borrow().message_queue.len(), 0);
1449 assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1450 assert_eq!(
1451 exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1452 UnixNanos::from(350)
1453 );
1454 }
1455}