1#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23 cell::RefCell,
24 collections::{BinaryHeap, HashMap, VecDeque},
25 rc::Rc,
26};
27
28use nautilus_common::{cache::Cache, clock::Clock};
29use nautilus_core::{
30 UnixNanos,
31 correctness::{FAILED, check_equal},
32};
33use nautilus_execution::{
34 client::ExecutionClient,
35 matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
36 messages::TradingCommand,
37 models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
38};
39use nautilus_model::{
40 accounts::AccountAny,
41 data::{
42 Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
43 QuoteTick, TradeTick,
44 },
45 enums::{AccountType, BookType, OmsType},
46 identifiers::{InstrumentId, Venue},
47 instruments::{Instrument, InstrumentAny},
48 orderbook::OrderBook,
49 orders::PassiveOrderAny,
50 types::{AccountBalance, Currency, Money, Price},
51};
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53
54use crate::modules::SimulationModule;
55
56#[derive(Debug, Eq, PartialEq)]
60struct InflightCommand {
61 ts: UnixNanos,
62 counter: u32,
63 command: TradingCommand,
64}
65
66impl InflightCommand {
67 const fn new(ts: UnixNanos, counter: u32, command: TradingCommand) -> Self {
68 Self {
69 ts,
70 counter,
71 command,
72 }
73 }
74}
75
76impl Ord for InflightCommand {
77 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78 other
80 .ts
81 .cmp(&self.ts)
82 .then_with(|| other.counter.cmp(&self.counter))
83 }
84}
85
86impl PartialOrd for InflightCommand {
87 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92pub struct SimulatedExchange {
93 pub id: Venue,
94 pub oms_type: OmsType,
95 pub account_type: AccountType,
96 starting_balances: Vec<Money>,
97 book_type: BookType,
98 default_leverage: Decimal,
99 exec_client: Option<Rc<dyn ExecutionClient>>,
100 pub base_currency: Option<Currency>,
101 fee_model: FeeModelAny,
102 fill_model: FillModel,
103 latency_model: Option<LatencyModel>,
104 instruments: HashMap<InstrumentId, InstrumentAny>,
105 matching_engines: HashMap<InstrumentId, OrderMatchingEngine>,
106 leverages: HashMap<InstrumentId, Decimal>,
107 modules: Vec<Box<dyn SimulationModule>>,
108 clock: Rc<RefCell<dyn Clock>>,
109 cache: Rc<RefCell<Cache>>,
110 message_queue: VecDeque<TradingCommand>,
111 inflight_queue: BinaryHeap<InflightCommand>,
112 inflight_counter: HashMap<UnixNanos, u32>,
113 frozen_account: bool,
114 bar_execution: bool,
115 reject_stop_orders: bool,
116 support_gtd_orders: bool,
117 support_contingent_orders: bool,
118 use_position_ids: bool,
119 use_random_ids: bool,
120 use_reduce_only: bool,
121 use_message_queue: bool,
122}
123
124impl SimulatedExchange {
125 #[allow(clippy::too_many_arguments)]
127 pub fn new(
128 venue: Venue,
129 oms_type: OmsType,
130 account_type: AccountType,
131 starting_balances: Vec<Money>,
132 base_currency: Option<Currency>,
133 default_leverage: Decimal,
134 leverages: HashMap<InstrumentId, Decimal>,
135 modules: Vec<Box<dyn SimulationModule>>,
136 cache: Rc<RefCell<Cache>>,
137 clock: Rc<RefCell<dyn Clock>>,
138 fill_model: FillModel,
139 fee_model: FeeModelAny,
140 book_type: BookType,
141 latency_model: Option<LatencyModel>,
142 frozen_account: Option<bool>,
143 bar_execution: Option<bool>,
144 reject_stop_orders: Option<bool>,
145 support_gtd_orders: Option<bool>,
146 support_contingent_orders: Option<bool>,
147 use_position_ids: Option<bool>,
148 use_random_ids: Option<bool>,
149 use_reduce_only: Option<bool>,
150 use_message_queue: Option<bool>,
151 ) -> anyhow::Result<Self> {
152 if starting_balances.is_empty() {
153 anyhow::bail!("Starting balances must be provided")
154 }
155 if base_currency.is_some() && starting_balances.len() > 1 {
156 anyhow::bail!("single-currency account has multiple starting currencies")
157 }
158 Ok(Self {
160 id: venue,
161 oms_type,
162 account_type,
163 starting_balances,
164 book_type,
165 default_leverage,
166 exec_client: None,
167 base_currency,
168 fee_model,
169 fill_model,
170 latency_model,
171 instruments: HashMap::new(),
172 matching_engines: HashMap::new(),
173 leverages,
174 modules,
175 clock,
176 cache,
177 message_queue: VecDeque::new(),
178 inflight_queue: BinaryHeap::new(),
179 inflight_counter: HashMap::new(),
180 frozen_account: frozen_account.unwrap_or(false),
181 bar_execution: bar_execution.unwrap_or(true),
182 reject_stop_orders: reject_stop_orders.unwrap_or(true),
183 support_gtd_orders: support_gtd_orders.unwrap_or(true),
184 support_contingent_orders: support_contingent_orders.unwrap_or(true),
185 use_position_ids: use_position_ids.unwrap_or(true),
186 use_random_ids: use_random_ids.unwrap_or(false),
187 use_reduce_only: use_reduce_only.unwrap_or(true),
188 use_message_queue: use_message_queue.unwrap_or(true),
189 })
190 }
191
192 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
193 self.exec_client = Some(client);
194 }
195
196 pub fn set_fill_model(&mut self, fill_model: FillModel) {
197 for matching_engine in self.matching_engines.values_mut() {
198 matching_engine.set_fill_model(fill_model.clone());
199 log::info!(
200 "Setting fill model for {} to {}",
201 matching_engine.venue,
202 self.fill_model
203 );
204 }
205 self.fill_model = fill_model;
206 }
207
208 pub const fn set_latency_model(&mut self, latency_model: LatencyModel) {
209 self.latency_model = Some(latency_model);
210 }
211
212 pub fn initialize_account(&mut self) {
213 self.generate_fresh_account_state();
214 }
215
216 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
217 check_equal(
218 instrument.id().venue,
219 self.id,
220 "Venue of instrument id",
221 "Venue of simulated exchange",
222 )
223 .expect(FAILED);
224
225 if self.account_type == AccountType::Cash
226 && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
227 || matches!(instrument, InstrumentAny::CryptoFuture(_)))
228 {
229 anyhow::bail!("Cash account cannot trade futures or perpetuals")
230 }
231
232 self.instruments.insert(instrument.id(), instrument.clone());
233
234 let matching_engine_config = OrderMatchingEngineConfig::new(
235 self.bar_execution,
236 self.reject_stop_orders,
237 self.support_gtd_orders,
238 self.support_contingent_orders,
239 self.use_position_ids,
240 self.use_random_ids,
241 self.use_reduce_only,
242 );
243 let instrument_id = instrument.id();
244 let matching_engine = OrderMatchingEngine::new(
245 instrument,
246 self.instruments.len() as u32,
247 self.fill_model.clone(),
248 self.fee_model.clone(),
249 self.book_type,
250 self.oms_type,
251 self.account_type,
252 self.clock.clone(),
253 Rc::clone(&self.cache),
254 matching_engine_config,
255 );
256 self.matching_engines.insert(instrument_id, matching_engine);
257
258 log::info!("Added instrument {instrument_id} and created matching engine");
259 Ok(())
260 }
261
262 #[must_use]
263 pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
264 self.matching_engines
265 .get(&instrument_id)
266 .and_then(OrderMatchingEngine::best_bid_price)
267 }
268
269 #[must_use]
270 pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
271 self.matching_engines
272 .get(&instrument_id)
273 .and_then(OrderMatchingEngine::best_ask_price)
274 }
275
276 pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
277 self.matching_engines
278 .get(&instrument_id)
279 .map(OrderMatchingEngine::get_book)
280 }
281
282 #[must_use]
283 pub fn get_matching_engine(
284 &self,
285 instrument_id: &InstrumentId,
286 ) -> Option<&OrderMatchingEngine> {
287 self.matching_engines.get(instrument_id)
288 }
289
290 #[must_use]
291 pub const fn get_matching_engines(&self) -> &HashMap<InstrumentId, OrderMatchingEngine> {
292 &self.matching_engines
293 }
294
295 #[must_use]
296 pub fn get_books(&self) -> HashMap<InstrumentId, OrderBook> {
297 let mut books = HashMap::new();
298 for (instrument_id, matching_engine) in &self.matching_engines {
299 books.insert(*instrument_id, matching_engine.get_book().clone());
300 }
301 books
302 }
303
304 #[must_use]
305 pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
306 instrument_id
307 .and_then(|id| {
308 self.matching_engines
309 .get(&id)
310 .map(OrderMatchingEngine::get_open_orders)
311 })
312 .unwrap_or_else(|| {
313 self.matching_engines
314 .values()
315 .flat_map(OrderMatchingEngine::get_open_orders)
316 .collect()
317 })
318 }
319
320 #[must_use]
321 pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
322 instrument_id
323 .and_then(|id| {
324 self.matching_engines
325 .get(&id)
326 .map(|engine| engine.get_open_bid_orders().to_vec())
327 })
328 .unwrap_or_else(|| {
329 self.matching_engines
330 .values()
331 .flat_map(|engine| engine.get_open_bid_orders().to_vec())
332 .collect()
333 })
334 }
335
336 #[must_use]
337 pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
338 instrument_id
339 .and_then(|id| {
340 self.matching_engines
341 .get(&id)
342 .map(|engine| engine.get_open_ask_orders().to_vec())
343 })
344 .unwrap_or_else(|| {
345 self.matching_engines
346 .values()
347 .flat_map(|engine| engine.get_open_ask_orders().to_vec())
348 .collect()
349 })
350 }
351
352 #[must_use]
353 pub fn get_account(&self) -> Option<AccountAny> {
354 self.exec_client
355 .as_ref()
356 .map(|client| client.get_account().unwrap())
357 }
358
359 pub fn adjust_account(&mut self, adjustment: Money) {
360 if self.frozen_account {
361 return;
363 }
364
365 if let Some(exec_client) = &self.exec_client {
366 let venue = exec_client.venue();
367 println!("Adjusting account for venue {venue}");
368 if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
369 match account.balance(Some(adjustment.currency)) {
370 Some(balance) => {
371 let mut current_balance = *balance;
372 current_balance.total += adjustment;
373 current_balance.free += adjustment;
374
375 let margins = match account {
376 AccountAny::Margin(margin_account) => margin_account.margins.clone(),
377 _ => HashMap::new(),
378 };
379
380 if let Some(exec_client) = &self.exec_client {
381 exec_client
382 .generate_account_state(
383 vec![current_balance],
384 margins.values().copied().collect(),
385 true,
386 self.clock.borrow().timestamp_ns(),
387 )
388 .unwrap();
389 }
390 }
391 None => {
392 log::error!(
393 "Cannot adjust account: no balance for currency {}",
394 adjustment.currency
395 );
396 }
397 }
398 } else {
399 log::error!("Cannot adjust account: no account for venue {venue}");
400 }
401 }
402 }
403
404 pub fn send(&mut self, command: TradingCommand) {
405 if !self.use_message_queue {
406 self.process_trading_command(command);
407 } else if self.latency_model.is_none() {
408 self.message_queue.push_back(command);
409 } else {
410 let (ts, counter) = self.generate_inflight_command(&command);
411 self.inflight_queue
412 .push(InflightCommand::new(ts, counter, command));
413 }
414 }
415
416 pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
417 if let Some(latency_model) = &self.latency_model {
418 let ts = match command {
419 TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
420 command.ts_init() + latency_model.insert_latency_nanos
421 }
422 TradingCommand::ModifyOrder(_) => {
423 command.ts_init() + latency_model.update_latency_nanos
424 }
425 TradingCommand::CancelOrder(_)
426 | TradingCommand::CancelAllOrders(_)
427 | TradingCommand::BatchCancelOrders(_) => {
428 command.ts_init() + latency_model.delete_latency_nanos
429 }
430 _ => panic!("Invalid command was {command}"),
431 };
432
433 let counter = self
434 .inflight_counter
435 .entry(ts)
436 .and_modify(|e| *e += 1)
437 .or_insert(1);
438
439 (ts, *counter)
440 } else {
441 panic!("Latency model should be initialized");
442 }
443 }
444
445 pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
446 for module in &self.modules {
447 module.pre_process(Data::Delta(delta));
448 }
449
450 if !self.matching_engines.contains_key(&delta.instrument_id) {
451 let instrument = {
452 let cache = self.cache.as_ref().borrow();
453 cache.instrument(&delta.instrument_id).cloned()
454 };
455
456 if let Some(instrument) = instrument {
457 self.add_instrument(instrument).unwrap();
458 } else {
459 panic!(
460 "No matching engine found for instrument {}",
461 delta.instrument_id
462 );
463 }
464 }
465
466 if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
467 matching_engine.process_order_book_delta(&delta);
468 } else {
469 panic!("Matching engine should be initialized");
470 }
471 }
472
473 pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
474 for module in &self.modules {
475 module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
476 }
477
478 if !self.matching_engines.contains_key(&deltas.instrument_id) {
479 let instrument = {
480 let cache = self.cache.as_ref().borrow();
481 cache.instrument(&deltas.instrument_id).cloned()
482 };
483
484 if let Some(instrument) = instrument {
485 self.add_instrument(instrument).unwrap();
486 } else {
487 panic!(
488 "No matching engine found for instrument {}",
489 deltas.instrument_id
490 );
491 }
492 }
493
494 if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
495 matching_engine.process_order_book_deltas(&deltas);
496 } else {
497 panic!("Matching engine should be initialized");
498 }
499 }
500
501 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
502 for module in &self.modules {
503 module.pre_process(Data::Quote(quote.to_owned()));
504 }
505
506 if !self.matching_engines.contains_key("e.instrument_id) {
507 let instrument = {
508 let cache = self.cache.as_ref().borrow();
509 cache.instrument("e.instrument_id).cloned()
510 };
511
512 if let Some(instrument) = instrument {
513 self.add_instrument(instrument).unwrap();
514 } else {
515 panic!(
516 "No matching engine found for instrument {}",
517 quote.instrument_id
518 );
519 }
520 }
521
522 if let Some(matching_engine) = self.matching_engines.get_mut("e.instrument_id) {
523 matching_engine.process_quote_tick(quote);
524 } else {
525 panic!("Matching engine should be initialized");
526 }
527 }
528
529 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
530 for module in &self.modules {
531 module.pre_process(Data::Trade(trade.to_owned()));
532 }
533
534 if !self.matching_engines.contains_key(&trade.instrument_id) {
535 let instrument = {
536 let cache = self.cache.as_ref().borrow();
537 cache.instrument(&trade.instrument_id).cloned()
538 };
539
540 if let Some(instrument) = instrument {
541 self.add_instrument(instrument).unwrap();
542 } else {
543 panic!(
544 "No matching engine found for instrument {}",
545 trade.instrument_id
546 );
547 }
548 }
549
550 if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
551 matching_engine.process_trade_tick(trade);
552 } else {
553 panic!("Matching engine should be initialized");
554 }
555 }
556
557 pub fn process_bar(&mut self, bar: Bar) {
558 for module in &self.modules {
559 module.pre_process(Data::Bar(bar));
560 }
561
562 if !self.matching_engines.contains_key(&bar.instrument_id()) {
563 let instrument = {
564 let cache = self.cache.as_ref().borrow();
565 cache.instrument(&bar.instrument_id()).cloned()
566 };
567
568 if let Some(instrument) = instrument {
569 self.add_instrument(instrument).unwrap();
570 } else {
571 panic!(
572 "No matching engine found for instrument {}",
573 bar.instrument_id()
574 );
575 }
576 }
577
578 if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
579 matching_engine.process_bar(&bar);
580 } else {
581 panic!("Matching engine should be initialized");
582 }
583 }
584
585 pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
586 if !self.matching_engines.contains_key(&status.instrument_id) {
589 let instrument = {
590 let cache = self.cache.as_ref().borrow();
591 cache.instrument(&status.instrument_id).cloned()
592 };
593
594 if let Some(instrument) = instrument {
595 self.add_instrument(instrument).unwrap();
596 } else {
597 panic!(
598 "No matching engine found for instrument {}",
599 status.instrument_id
600 );
601 }
602 }
603
604 if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
605 matching_engine.process_status(status.action);
606 } else {
607 panic!("Matching engine should be initialized");
608 }
609 }
610
611 pub fn process(&mut self, ts_now: UnixNanos) {
612 while let Some(inflight) = self.inflight_queue.peek() {
616 if inflight.ts > ts_now {
617 break;
619 }
620 let inflight = self.inflight_queue.pop().unwrap();
622 self.process_trading_command(inflight.command);
623 }
624
625 while let Some(command) = self.message_queue.pop_front() {
627 self.process_trading_command(command);
628 }
629 }
630
631 pub fn reset(&mut self) {
632 for module in &self.modules {
633 module.reset();
634 }
635
636 self.generate_fresh_account_state();
637
638 for matching_engine in self.matching_engines.values_mut() {
639 matching_engine.reset();
640 }
641
642 log::info!("Resetting exchange state");
644 }
645
646 pub fn process_trading_command(&mut self, command: TradingCommand) {
647 if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
648 let account_id = if let Some(exec_client) = &self.exec_client {
649 exec_client.account_id()
650 } else {
651 panic!("Execution client should be initialized");
652 };
653 match command {
654 TradingCommand::SubmitOrder(mut command) => {
655 matching_engine.process_order(&mut command.order, account_id);
656 }
657 TradingCommand::ModifyOrder(ref command) => {
658 matching_engine.process_modify(command, account_id);
659 }
660 TradingCommand::CancelOrder(ref command) => {
661 matching_engine.process_cancel(command, account_id);
662 }
663 TradingCommand::CancelAllOrders(ref command) => {
664 matching_engine.process_cancel_all(command, account_id);
665 }
666 TradingCommand::BatchCancelOrders(ref command) => {
667 matching_engine.process_batch_cancel(command, account_id);
668 }
669 TradingCommand::SubmitOrderList(mut command) => {
670 for order in &mut command.order_list.orders {
671 matching_engine.process_order(order, account_id);
672 }
673 }
674 _ => {}
675 }
676 } else {
677 panic!("Matching engine should be initialized");
678 }
679 }
680
681 pub fn generate_fresh_account_state(&self) {
682 let balances: Vec<AccountBalance> = self
683 .starting_balances
684 .iter()
685 .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
686 .collect();
687
688 if let Some(exec_client) = &self.exec_client {
689 exec_client
690 .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
691 .unwrap();
692 }
693
694 if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
696 margin_account.set_default_leverage(self.default_leverage.to_f64().unwrap());
697
698 for (instrument_id, leverage) in &self.leverages {
700 margin_account.set_leverage(*instrument_id, leverage.to_f64().unwrap());
701 }
702 }
703 }
704}
705
706#[cfg(test)]
710mod tests {
711 use std::{
712 cell::RefCell,
713 collections::{BinaryHeap, HashMap},
714 rc::Rc,
715 sync::LazyLock,
716 };
717
718 use nautilus_common::{
719 cache::Cache,
720 clock::TestClock,
721 msgbus::{
722 self,
723 stubs::{get_message_saving_handler, get_saved_messages},
724 },
725 };
726 use nautilus_core::{AtomicTime, UUID4, UnixNanos};
727 use nautilus_execution::{
728 messages::{SubmitOrder, TradingCommand},
729 models::{
730 fee::{FeeModelAny, MakerTakerFeeModel},
731 fill::FillModel,
732 latency::LatencyModel,
733 },
734 };
735 use nautilus_model::{
736 accounts::{AccountAny, MarginAccount},
737 data::{
738 Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
739 TradeTick,
740 },
741 enums::{
742 AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
743 OmsType, OrderSide, OrderType,
744 },
745 events::AccountState,
746 identifiers::{
747 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
748 VenueOrderId,
749 },
750 instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
751 orders::OrderTestBuilder,
752 types::{AccountBalance, Currency, Money, Price, Quantity},
753 };
754 use rstest::rstest;
755 use ustr::Ustr;
756
757 use crate::{
758 exchange::{InflightCommand, SimulatedExchange},
759 execution_client::BacktestExecutionClient,
760 };
761
762 static ATOMIC_TIME: LazyLock<AtomicTime> =
763 LazyLock::new(|| AtomicTime::new(true, UnixNanos::default()));
764
765 fn get_exchange(
766 venue: Venue,
767 account_type: AccountType,
768 book_type: BookType,
769 cache: Option<Rc<RefCell<Cache>>>,
770 ) -> Rc<RefCell<SimulatedExchange>> {
771 let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
772 let clock = Rc::new(RefCell::new(TestClock::new()));
773 let exchange = Rc::new(RefCell::new(
774 SimulatedExchange::new(
775 venue,
776 OmsType::Netting,
777 account_type,
778 vec![Money::new(1000.0, Currency::USD())],
779 None,
780 1.into(),
781 HashMap::new(),
782 vec![],
783 cache.clone(),
784 clock,
785 FillModel::default(),
786 FeeModelAny::MakerTaker(MakerTakerFeeModel),
787 book_type,
788 None,
789 None,
790 None,
791 None,
792 None,
793 None,
794 None,
795 None,
796 None,
797 None,
798 )
799 .unwrap(),
800 ));
801
802 let clock = TestClock::new();
803 let execution_client = BacktestExecutionClient::new(
804 TraderId::default(),
805 AccountId::default(),
806 exchange.clone(),
807 cache.clone(),
808 Rc::new(RefCell::new(clock)),
809 None,
810 None,
811 );
812 exchange
813 .borrow_mut()
814 .register_client(Rc::new(execution_client));
815
816 exchange
817 }
818
819 fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
820 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
821 let order = OrderTestBuilder::new(OrderType::Market)
822 .instrument_id(instrument_id)
823 .quantity(Quantity::from(1))
824 .build();
825 TradingCommand::SubmitOrder(
826 SubmitOrder::new(
827 TraderId::default(),
828 ClientId::default(),
829 StrategyId::default(),
830 instrument_id,
831 ClientOrderId::default(),
832 VenueOrderId::default(),
833 order,
834 None,
835 None,
836 UUID4::default(),
837 ts_init,
838 )
839 .unwrap(),
840 )
841 }
842
843 #[rstest]
844 #[should_panic(
845 expected = r#"Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"#
846 )]
847 fn test_venue_mismatch_between_exchange_and_instrument(
848 crypto_perpetual_ethusdt: CryptoPerpetual,
849 ) {
850 let exchange = get_exchange(
851 Venue::new("SIM"),
852 AccountType::Margin,
853 BookType::L1_MBP,
854 None,
855 );
856 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
857 exchange.borrow_mut().add_instrument(instrument).unwrap();
858 }
859
860 #[rstest]
861 #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
862 fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
863 let exchange = get_exchange(
864 Venue::new("BINANCE"),
865 AccountType::Cash,
866 BookType::L1_MBP,
867 None,
868 );
869 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
870 exchange.borrow_mut().add_instrument(instrument).unwrap();
871 }
872
873 #[rstest]
874 fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
875 let exchange = get_exchange(
876 Venue::new("BINANCE"),
877 AccountType::Margin,
878 BookType::L1_MBP,
879 None,
880 );
881 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
882
883 exchange.borrow_mut().add_instrument(instrument).unwrap();
885
886 let quote_tick = QuoteTick::new(
888 crypto_perpetual_ethusdt.id,
889 Price::from("1000"),
890 Price::from("1001"),
891 Quantity::from(1),
892 Quantity::from(1),
893 UnixNanos::default(),
894 UnixNanos::default(),
895 );
896 exchange.borrow_mut().process_quote_tick("e_tick);
897
898 let best_bid_price = exchange
899 .borrow()
900 .best_bid_price(crypto_perpetual_ethusdt.id);
901 assert_eq!(best_bid_price, Some(Price::from("1000")));
902 let best_ask_price = exchange
903 .borrow()
904 .best_ask_price(crypto_perpetual_ethusdt.id);
905 assert_eq!(best_ask_price, Some(Price::from("1001")));
906 }
907
908 #[rstest]
909 fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
910 let exchange = get_exchange(
911 Venue::new("BINANCE"),
912 AccountType::Margin,
913 BookType::L1_MBP,
914 None,
915 );
916 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
917
918 exchange.borrow_mut().add_instrument(instrument).unwrap();
920
921 let trade_tick = TradeTick::new(
923 crypto_perpetual_ethusdt.id,
924 Price::from("1000"),
925 Quantity::from(1),
926 AggressorSide::Buyer,
927 TradeId::from("1"),
928 UnixNanos::default(),
929 UnixNanos::default(),
930 );
931 exchange.borrow_mut().process_trade_tick(&trade_tick);
932
933 let best_bid_price = exchange
934 .borrow()
935 .best_bid_price(crypto_perpetual_ethusdt.id);
936 assert_eq!(best_bid_price, Some(Price::from("1000")));
937 let best_ask = exchange
938 .borrow()
939 .best_ask_price(crypto_perpetual_ethusdt.id);
940 assert_eq!(best_ask, Some(Price::from("1000")));
941 }
942
943 #[rstest]
944 fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
945 let exchange = get_exchange(
946 Venue::new("BINANCE"),
947 AccountType::Margin,
948 BookType::L1_MBP,
949 None,
950 );
951 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
952
953 exchange.borrow_mut().add_instrument(instrument).unwrap();
955
956 let bar = Bar::new(
958 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
959 Price::from("1500.00"),
960 Price::from("1505.00"),
961 Price::from("1490.00"),
962 Price::from("1502.00"),
963 Quantity::from(100),
964 UnixNanos::default(),
965 UnixNanos::default(),
966 );
967 exchange.borrow_mut().process_bar(bar);
968
969 let best_bid_price = exchange
971 .borrow()
972 .best_bid_price(crypto_perpetual_ethusdt.id);
973 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
974 let best_ask_price = exchange
975 .borrow()
976 .best_ask_price(crypto_perpetual_ethusdt.id);
977 assert_eq!(best_ask_price, Some(Price::from("1502.00")));
978 }
979
980 #[rstest]
981 fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
982 let exchange = get_exchange(
983 Venue::new("BINANCE"),
984 AccountType::Margin,
985 BookType::L1_MBP,
986 None,
987 );
988 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
989
990 exchange.borrow_mut().add_instrument(instrument).unwrap();
992
993 let bar_bid = Bar::new(
996 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
997 Price::from("1500.00"),
998 Price::from("1505.00"),
999 Price::from("1490.00"),
1000 Price::from("1502.00"),
1001 Quantity::from(100),
1002 UnixNanos::from(1),
1003 UnixNanos::from(1),
1004 );
1005 let bar_ask = Bar::new(
1006 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1007 Price::from("1501.00"),
1008 Price::from("1506.00"),
1009 Price::from("1491.00"),
1010 Price::from("1503.00"),
1011 Quantity::from(100),
1012 UnixNanos::from(1),
1013 UnixNanos::from(1),
1014 );
1015
1016 exchange.borrow_mut().process_bar(bar_bid);
1018 exchange.borrow_mut().process_bar(bar_ask);
1019
1020 let best_bid_price = exchange
1022 .borrow()
1023 .best_bid_price(crypto_perpetual_ethusdt.id);
1024 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1025 let best_ask_price = exchange
1026 .borrow()
1027 .best_ask_price(crypto_perpetual_ethusdt.id);
1028 assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1029 }
1030
1031 #[rstest]
1032 fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1033 let exchange = get_exchange(
1034 Venue::new("BINANCE"),
1035 AccountType::Margin,
1036 BookType::L2_MBP,
1037 None,
1038 );
1039 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1040
1041 exchange.borrow_mut().add_instrument(instrument).unwrap();
1043
1044 let delta_buy = OrderBookDelta::new(
1046 crypto_perpetual_ethusdt.id,
1047 BookAction::Add,
1048 BookOrder::new(OrderSide::Buy, Price::from("1000.00"), Quantity::from(1), 1),
1049 0,
1050 0,
1051 UnixNanos::from(1),
1052 UnixNanos::from(1),
1053 );
1054 let delta_sell = OrderBookDelta::new(
1055 crypto_perpetual_ethusdt.id,
1056 BookAction::Add,
1057 BookOrder::new(
1058 OrderSide::Sell,
1059 Price::from("1001.00"),
1060 Quantity::from(1),
1061 1,
1062 ),
1063 0,
1064 1,
1065 UnixNanos::from(2),
1066 UnixNanos::from(2),
1067 );
1068
1069 exchange.borrow_mut().process_order_book_delta(delta_buy);
1071 exchange.borrow_mut().process_order_book_delta(delta_sell);
1072
1073 let book = exchange
1074 .borrow()
1075 .get_book(crypto_perpetual_ethusdt.id)
1076 .unwrap()
1077 .clone();
1078 assert_eq!(book.update_count, 2);
1079 assert_eq!(book.sequence, 1);
1080 assert_eq!(book.ts_last, UnixNanos::from(2));
1081 let best_bid_price = exchange
1082 .borrow()
1083 .best_bid_price(crypto_perpetual_ethusdt.id);
1084 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1085 let best_ask_price = exchange
1086 .borrow()
1087 .best_ask_price(crypto_perpetual_ethusdt.id);
1088 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1089 }
1090
1091 #[rstest]
1092 fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1093 let exchange = get_exchange(
1094 Venue::new("BINANCE"),
1095 AccountType::Margin,
1096 BookType::L2_MBP,
1097 None,
1098 );
1099 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1100
1101 exchange.borrow_mut().add_instrument(instrument).unwrap();
1103
1104 let delta_sell_1 = OrderBookDelta::new(
1106 crypto_perpetual_ethusdt.id,
1107 BookAction::Add,
1108 BookOrder::new(
1109 OrderSide::Sell,
1110 Price::from("1000.00"),
1111 Quantity::from(3),
1112 1,
1113 ),
1114 0,
1115 0,
1116 UnixNanos::from(1),
1117 UnixNanos::from(1),
1118 );
1119 let delta_sell_2 = OrderBookDelta::new(
1120 crypto_perpetual_ethusdt.id,
1121 BookAction::Add,
1122 BookOrder::new(
1123 OrderSide::Sell,
1124 Price::from("1001.00"),
1125 Quantity::from(1),
1126 1,
1127 ),
1128 0,
1129 1,
1130 UnixNanos::from(1),
1131 UnixNanos::from(1),
1132 );
1133 let orderbook_deltas = OrderBookDeltas::new(
1134 crypto_perpetual_ethusdt.id,
1135 vec![delta_sell_1, delta_sell_2],
1136 );
1137
1138 exchange
1140 .borrow_mut()
1141 .process_order_book_deltas(orderbook_deltas);
1142
1143 let book = exchange
1144 .borrow()
1145 .get_book(crypto_perpetual_ethusdt.id)
1146 .unwrap()
1147 .clone();
1148 assert_eq!(book.update_count, 2);
1149 assert_eq!(book.sequence, 1);
1150 assert_eq!(book.ts_last, UnixNanos::from(1));
1151 let best_bid_price = exchange
1152 .borrow()
1153 .best_bid_price(crypto_perpetual_ethusdt.id);
1154 assert_eq!(best_bid_price, None);
1156 let best_ask_price = exchange
1157 .borrow()
1158 .best_ask_price(crypto_perpetual_ethusdt.id);
1159 assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1161 }
1162
1163 #[rstest]
1164 fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1165 let exchange = get_exchange(
1166 Venue::new("BINANCE"),
1167 AccountType::Margin,
1168 BookType::L2_MBP,
1169 None,
1170 );
1171 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1172
1173 exchange.borrow_mut().add_instrument(instrument).unwrap();
1175
1176 let instrument_status = InstrumentStatus::new(
1177 crypto_perpetual_ethusdt.id,
1178 MarketStatusAction::Close, UnixNanos::from(1),
1180 UnixNanos::from(1),
1181 None,
1182 None,
1183 None,
1184 None,
1185 None,
1186 );
1187
1188 exchange
1189 .borrow_mut()
1190 .process_instrument_status(instrument_status);
1191
1192 let market_status = exchange
1193 .borrow()
1194 .get_matching_engine(&crypto_perpetual_ethusdt.id)
1195 .unwrap()
1196 .market_status;
1197 assert_eq!(market_status, MarketStatus::Closed);
1198 }
1199
1200 #[rstest]
1201 fn test_accounting() {
1202 let account_type = AccountType::Margin;
1203 let mut cache = Cache::default();
1204 let handler = get_message_saving_handler::<AccountState>(None);
1205 msgbus::register(Ustr::from("Portfolio.update_account"), handler.clone());
1206 let margin_account = MarginAccount::new(
1207 AccountState::new(
1208 AccountId::from("SIM-001"),
1209 account_type,
1210 vec![AccountBalance::new(
1211 Money::from("1000 USD"),
1212 Money::from("0 USD"),
1213 Money::from("1000 USD"),
1214 )],
1215 vec![],
1216 false,
1217 UUID4::default(),
1218 UnixNanos::default(),
1219 UnixNanos::default(),
1220 None,
1221 ),
1222 false,
1223 );
1224 let () = cache
1225 .add_account(AccountAny::Margin(margin_account))
1226 .unwrap();
1227 cache.build_index();
1229
1230 let exchange = get_exchange(
1231 Venue::new("SIM"),
1232 account_type,
1233 BookType::L2_MBP,
1234 Some(Rc::new(RefCell::new(cache))),
1235 );
1236 exchange.borrow_mut().initialize_account();
1237
1238 exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1240
1241 let messages = get_saved_messages::<AccountState>(handler);
1243 assert_eq!(messages.len(), 2);
1244 let account_state_first = messages.first().unwrap();
1245 let account_state_second = messages.last().unwrap();
1246
1247 assert_eq!(account_state_first.balances.len(), 1);
1248 let current_balance = account_state_first.balances[0];
1249 assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1250 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1251 assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1252
1253 assert_eq!(account_state_second.balances.len(), 1);
1254 let current_balance = account_state_second.balances[0];
1255 assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1256 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1257 assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1258 }
1259
1260 #[rstest]
1261 fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1262 let inflight1 = InflightCommand::new(
1264 UnixNanos::from(100),
1265 1,
1266 create_submit_order_command(UnixNanos::from(100)),
1267 );
1268 let inflight2 = InflightCommand::new(
1269 UnixNanos::from(200),
1270 2,
1271 create_submit_order_command(UnixNanos::from(200)),
1272 );
1273 let inflight3 = InflightCommand::new(
1274 UnixNanos::from(100),
1275 2,
1276 create_submit_order_command(UnixNanos::from(100)),
1277 );
1278
1279 let mut inflight_heap = BinaryHeap::new();
1281 inflight_heap.push(inflight1);
1282 inflight_heap.push(inflight2);
1283 inflight_heap.push(inflight3);
1284
1285 let first = inflight_heap.pop().unwrap();
1288 let second = inflight_heap.pop().unwrap();
1289 let third = inflight_heap.pop().unwrap();
1290
1291 assert_eq!(first.ts, UnixNanos::from(100));
1292 assert_eq!(first.counter, 1);
1293 assert_eq!(second.ts, UnixNanos::from(100));
1294 assert_eq!(second.counter, 2);
1295 assert_eq!(third.ts, UnixNanos::from(200));
1296 assert_eq!(third.counter, 2);
1297 }
1298
1299 #[rstest]
1300 fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1301 let exchange = get_exchange(
1302 Venue::new("BINANCE"),
1303 AccountType::Margin,
1304 BookType::L2_MBP,
1305 None,
1306 );
1307
1308 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1309 exchange.borrow_mut().add_instrument(instrument).unwrap();
1310
1311 let command1 = create_submit_order_command(UnixNanos::from(100));
1312 let command2 = create_submit_order_command(UnixNanos::from(200));
1313
1314 exchange.borrow_mut().send(command1);
1315 exchange.borrow_mut().send(command2);
1316
1317 assert_eq!(exchange.borrow().message_queue.len(), 2);
1320 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1321
1322 exchange.borrow_mut().process(UnixNanos::from(300));
1324 assert_eq!(exchange.borrow().message_queue.len(), 0);
1325 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1326 }
1327
1328 #[rstest]
1329 fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1330 let latency_model = LatencyModel::new(
1331 UnixNanos::from(100),
1332 UnixNanos::from(200),
1333 UnixNanos::from(300),
1334 UnixNanos::from(100),
1335 );
1336 let exchange = get_exchange(
1337 Venue::new("BINANCE"),
1338 AccountType::Margin,
1339 BookType::L2_MBP,
1340 None,
1341 );
1342 exchange.borrow_mut().set_latency_model(latency_model);
1343
1344 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1345 exchange.borrow_mut().add_instrument(instrument).unwrap();
1346
1347 let command1 = create_submit_order_command(UnixNanos::from(100));
1348 let command2 = create_submit_order_command(UnixNanos::from(150));
1349 exchange.borrow_mut().send(command1);
1350 exchange.borrow_mut().send(command2);
1351
1352 assert_eq!(exchange.borrow().message_queue.len(), 0);
1354 assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1355 assert_eq!(
1357 exchange.borrow().inflight_queue.iter().nth(0).unwrap().ts,
1358 UnixNanos::from(300)
1359 );
1360 assert_eq!(
1362 exchange.borrow().inflight_queue.iter().nth(1).unwrap().ts,
1363 UnixNanos::from(350)
1364 );
1365
1366 exchange.borrow_mut().process(UnixNanos::from(320));
1368 assert_eq!(exchange.borrow().message_queue.len(), 0);
1369 assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1370 assert_eq!(
1371 exchange.borrow().inflight_queue.iter().nth(0).unwrap().ts,
1372 UnixNanos::from(350)
1373 );
1374 }
1375}