1use std::{
19 cell::RefCell,
20 collections::{BinaryHeap, VecDeque},
21 fmt::Debug,
22 rc::Rc,
23};
24
25use ahash::AHashMap;
26use nautilus_common::{
27 cache::Cache, clients::ExecutionClient, clock::Clock, messages::execution::TradingCommand,
28};
29use nautilus_core::{
30 UnixNanos,
31 correctness::{FAILED, check_equal},
32};
33use nautilus_execution::{
34 matching_core::OrderMatchInfo,
35 matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
36 models::{fee::FeeModelAny, fill::FillModelAny, latency::LatencyModel},
37};
38use nautilus_model::{
39 accounts::AccountAny,
40 data::{
41 Bar, Data, InstrumentClose, InstrumentStatus, OrderBookDelta, OrderBookDeltas,
42 OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick,
43 },
44 enums::{AccountType, BookType, OmsType},
45 identifiers::{InstrumentId, Venue},
46 instruments::{Instrument, InstrumentAny},
47 orderbook::OrderBook,
48 orders::OrderAny,
49 types::{AccountBalance, Currency, Money, Price},
50};
51use rust_decimal::Decimal;
52
53use crate::modules::SimulationModule;
54
55#[derive(Debug, Eq, PartialEq)]
59struct InflightCommand {
60 timestamp: UnixNanos,
61 counter: u32,
62 command: TradingCommand,
63}
64
65impl InflightCommand {
66 const fn new(timestamp: UnixNanos, counter: u32, command: TradingCommand) -> Self {
67 Self {
68 timestamp,
69 counter,
70 command,
71 }
72 }
73}
74
75impl Ord for InflightCommand {
76 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
77 other
79 .timestamp
80 .cmp(&self.timestamp)
81 .then_with(|| other.counter.cmp(&self.counter))
82 }
83}
84
85impl PartialOrd for InflightCommand {
86 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
87 Some(self.cmp(other))
88 }
89}
90
91pub struct SimulatedExchange {
107 pub id: Venue,
108 pub oms_type: OmsType,
109 pub account_type: AccountType,
110 starting_balances: Vec<Money>,
111 book_type: BookType,
112 default_leverage: Decimal,
113 exec_client: Option<Rc<dyn ExecutionClient>>,
114 pub base_currency: Option<Currency>,
115 fee_model: FeeModelAny,
116 fill_model: FillModelAny,
117 latency_model: Option<Box<dyn LatencyModel>>,
118 instruments: AHashMap<InstrumentId, InstrumentAny>,
119 matching_engines: AHashMap<InstrumentId, OrderMatchingEngine>,
120 leverages: AHashMap<InstrumentId, Decimal>,
121 modules: Vec<Box<dyn SimulationModule>>,
122 clock: Rc<RefCell<dyn Clock>>,
123 cache: Rc<RefCell<Cache>>,
124 message_queue: VecDeque<TradingCommand>,
125 inflight_queue: BinaryHeap<InflightCommand>,
126 inflight_counter: AHashMap<UnixNanos, u32>,
127 bar_execution: bool,
128 bar_adaptive_high_low_ordering: bool,
129 trade_execution: bool,
130 liquidity_consumption: bool,
131 reject_stop_orders: bool,
132 support_gtd_orders: bool,
133 support_contingent_orders: bool,
134 use_position_ids: bool,
135 use_random_ids: bool,
136 use_reduce_only: bool,
137 use_message_queue: bool,
138 use_market_order_acks: bool,
139 _allow_cash_borrowing: bool,
140 frozen_account: bool,
141 price_protection_points: u32,
142}
143
144impl Debug for SimulatedExchange {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct(stringify!(SimulatedExchange))
147 .field("id", &self.id)
148 .field("account_type", &self.account_type)
149 .finish()
150 }
151}
152
153impl SimulatedExchange {
154 #[allow(clippy::too_many_arguments)]
162 pub fn new(
163 venue: Venue,
164 oms_type: OmsType,
165 account_type: AccountType,
166 starting_balances: Vec<Money>,
167 base_currency: Option<Currency>,
168 default_leverage: Decimal,
169 leverages: AHashMap<InstrumentId, Decimal>,
170 modules: Vec<Box<dyn SimulationModule>>,
171 cache: Rc<RefCell<Cache>>,
172 clock: Rc<RefCell<dyn Clock>>,
173 fill_model: FillModelAny,
174 fee_model: FeeModelAny,
175 book_type: BookType,
176 latency_model: Option<Box<dyn LatencyModel>>,
177 bar_execution: Option<bool>,
178 bar_adaptive_high_low_ordering: Option<bool>,
179 trade_execution: Option<bool>,
180 liquidity_consumption: Option<bool>,
181 reject_stop_orders: Option<bool>,
182 support_gtd_orders: Option<bool>,
183 support_contingent_orders: Option<bool>,
184 use_position_ids: Option<bool>,
185 use_random_ids: Option<bool>,
186 use_reduce_only: Option<bool>,
187 use_message_queue: Option<bool>,
188 use_market_order_acks: Option<bool>,
189 allow_cash_borrowing: Option<bool>,
190 frozen_account: Option<bool>,
191 price_protection_points: Option<u32>,
192 ) -> anyhow::Result<Self> {
193 if starting_balances.is_empty() {
194 anyhow::bail!("Starting balances must be provided")
195 }
196 if base_currency.is_some() && starting_balances.len() > 1 {
197 anyhow::bail!("single-currency account has multiple starting currencies")
198 }
199 Ok(Self {
201 id: venue,
202 oms_type,
203 account_type,
204 starting_balances,
205 book_type,
206 default_leverage,
207 exec_client: None,
208 base_currency,
209 fee_model,
210 fill_model,
211 latency_model,
212 instruments: AHashMap::new(),
213 matching_engines: AHashMap::new(),
214 leverages,
215 modules,
216 clock,
217 cache,
218 message_queue: VecDeque::new(),
219 inflight_queue: BinaryHeap::new(),
220 inflight_counter: AHashMap::new(),
221 bar_execution: bar_execution.unwrap_or(true),
222 bar_adaptive_high_low_ordering: bar_adaptive_high_low_ordering.unwrap_or(false),
223 trade_execution: trade_execution.unwrap_or(true),
224 liquidity_consumption: liquidity_consumption.unwrap_or(true),
225 reject_stop_orders: reject_stop_orders.unwrap_or(true),
226 support_gtd_orders: support_gtd_orders.unwrap_or(true),
227 support_contingent_orders: support_contingent_orders.unwrap_or(true),
228 use_position_ids: use_position_ids.unwrap_or(true),
229 use_random_ids: use_random_ids.unwrap_or(false),
230 use_reduce_only: use_reduce_only.unwrap_or(true),
231 use_message_queue: use_message_queue.unwrap_or(true),
232 use_market_order_acks: use_market_order_acks.unwrap_or(false),
233 _allow_cash_borrowing: allow_cash_borrowing.unwrap_or(false),
234 frozen_account: frozen_account.unwrap_or(false),
235 price_protection_points: price_protection_points.unwrap_or(0),
236 })
237 }
238
239 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
240 self.exec_client = Some(client);
241 }
242
243 pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
244 for matching_engine in self.matching_engines.values_mut() {
245 matching_engine.set_fill_model(fill_model.clone());
246 log::info!(
247 "Setting fill model for {} to {}",
248 matching_engine.venue,
249 self.fill_model
250 );
251 }
252 self.fill_model = fill_model;
253 }
254
255 pub fn set_latency_model(&mut self, latency_model: Box<dyn LatencyModel>) {
256 self.latency_model = Some(latency_model);
257 }
258
259 pub fn initialize_account(&mut self) {
260 self.generate_fresh_account_state();
261 }
262
263 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
273 check_equal(
274 &instrument.id().venue,
275 &self.id,
276 "Venue of instrument id",
277 "Venue of simulated exchange",
278 )
279 .expect(FAILED);
280
281 if self.account_type == AccountType::Cash
282 && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
283 || matches!(instrument, InstrumentAny::CryptoFuture(_)))
284 {
285 anyhow::bail!("Cash account cannot trade futures or perpetuals")
286 }
287
288 self.instruments.insert(instrument.id(), instrument.clone());
289
290 let price_protection = if self.price_protection_points == 0 {
291 None
292 } else {
293 Some(self.price_protection_points)
294 };
295
296 let matching_engine_config = OrderMatchingEngineConfig::new(
297 self.bar_execution,
298 self.bar_adaptive_high_low_ordering,
299 self.trade_execution,
300 self.liquidity_consumption,
301 self.reject_stop_orders,
302 self.support_gtd_orders,
303 self.support_contingent_orders,
304 self.use_position_ids,
305 self.use_random_ids,
306 self.use_reduce_only,
307 self.use_market_order_acks,
308 )
309 .with_price_protection_points(price_protection);
310 let instrument_id = instrument.id();
311 let matching_engine = OrderMatchingEngine::new(
312 instrument,
313 self.instruments.len() as u32,
314 self.fill_model.clone(),
315 self.fee_model.clone(),
316 self.book_type,
317 self.oms_type,
318 self.account_type,
319 self.clock.clone(),
320 Rc::clone(&self.cache),
321 matching_engine_config,
322 );
323 self.matching_engines.insert(instrument_id, matching_engine);
324
325 log::info!("Added instrument {instrument_id} and created matching engine");
326 Ok(())
327 }
328
329 #[must_use]
330 pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
331 self.matching_engines
332 .get(&instrument_id)
333 .and_then(OrderMatchingEngine::best_bid_price)
334 }
335
336 #[must_use]
337 pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
338 self.matching_engines
339 .get(&instrument_id)
340 .and_then(OrderMatchingEngine::best_ask_price)
341 }
342
343 pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
344 self.matching_engines
345 .get(&instrument_id)
346 .map(OrderMatchingEngine::get_book)
347 }
348
349 #[must_use]
350 pub fn get_matching_engine(
351 &self,
352 instrument_id: &InstrumentId,
353 ) -> Option<&OrderMatchingEngine> {
354 self.matching_engines.get(instrument_id)
355 }
356
357 #[must_use]
358 pub const fn get_matching_engines(&self) -> &AHashMap<InstrumentId, OrderMatchingEngine> {
359 &self.matching_engines
360 }
361
362 #[must_use]
363 pub fn get_books(&self) -> AHashMap<InstrumentId, OrderBook> {
364 let mut books = AHashMap::new();
365 for (instrument_id, matching_engine) in &self.matching_engines {
366 books.insert(*instrument_id, matching_engine.get_book().clone());
367 }
368 books
369 }
370
371 #[must_use]
372 pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
373 instrument_id
374 .and_then(|id| {
375 self.matching_engines
376 .get(&id)
377 .map(OrderMatchingEngine::get_open_orders)
378 })
379 .unwrap_or_else(|| {
380 self.matching_engines
381 .values()
382 .flat_map(OrderMatchingEngine::get_open_orders)
383 .collect()
384 })
385 }
386
387 #[must_use]
388 pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
389 instrument_id
390 .and_then(|id| {
391 self.matching_engines
392 .get(&id)
393 .map(|engine| engine.get_open_bid_orders().to_vec())
394 })
395 .unwrap_or_else(|| {
396 self.matching_engines
397 .values()
398 .flat_map(|engine| engine.get_open_bid_orders().to_vec())
399 .collect()
400 })
401 }
402
403 #[must_use]
404 pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
405 instrument_id
406 .and_then(|id| {
407 self.matching_engines
408 .get(&id)
409 .map(|engine| engine.get_open_ask_orders().to_vec())
410 })
411 .unwrap_or_else(|| {
412 self.matching_engines
413 .values()
414 .flat_map(|engine| engine.get_open_ask_orders().to_vec())
415 .collect()
416 })
417 }
418
419 #[must_use]
423 pub fn get_account(&self) -> Option<AccountAny> {
424 self.exec_client
425 .as_ref()
426 .map(|client| client.get_account().unwrap())
427 }
428
429 #[must_use]
431 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
432 &self.cache
433 }
434
435 pub fn adjust_account(&mut self, adjustment: Money) {
439 if self.frozen_account {
440 return;
442 }
443
444 if let Some(exec_client) = &self.exec_client {
445 let venue = exec_client.venue();
446 println!("Adjusting account for venue {venue}");
447 if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
448 match account.balance(Some(adjustment.currency)) {
449 Some(balance) => {
450 let mut current_balance = *balance;
451 current_balance.total = current_balance.total + adjustment;
452 current_balance.free = current_balance.free + adjustment;
453
454 let margins = match account {
455 AccountAny::Margin(margin_account) => margin_account.margins.clone(),
456 _ => AHashMap::new(),
457 };
458
459 if let Some(exec_client) = &self.exec_client {
460 exec_client
461 .generate_account_state(
462 vec![current_balance],
463 margins.values().copied().collect(),
464 true,
465 self.clock.borrow().timestamp_ns(),
466 )
467 .unwrap();
468 }
469 }
470 None => {
471 log::error!(
472 "Cannot adjust account: no balance for currency {}",
473 adjustment.currency
474 );
475 }
476 }
477 } else {
478 log::error!("Cannot adjust account: no account for venue {venue}");
479 }
480 }
481 }
482
483 #[must_use]
484 pub fn has_pending_commands(&self, ts_now: UnixNanos) -> bool {
485 if !self.message_queue.is_empty() {
486 return true;
487 }
488 self.inflight_queue
489 .peek()
490 .is_some_and(|inflight| inflight.timestamp <= ts_now)
491 }
492
493 pub fn send(&mut self, command: TradingCommand) {
494 if !self.use_message_queue {
495 self.process_trading_command(command);
496 } else if self.latency_model.is_none() {
497 self.message_queue.push_back(command);
498 } else {
499 let (timestamp, counter) = self.generate_inflight_command(&command);
500 self.inflight_queue
501 .push(InflightCommand::new(timestamp, counter, command));
502 }
503 }
504
505 pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
509 if let Some(latency_model) = &self.latency_model {
510 let ts = match command {
511 TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
512 command.ts_init() + latency_model.get_insert_latency()
513 }
514 TradingCommand::ModifyOrder(_) => {
515 command.ts_init() + latency_model.get_update_latency()
516 }
517 TradingCommand::CancelOrder(_)
518 | TradingCommand::CancelAllOrders(_)
519 | TradingCommand::BatchCancelOrders(_) => {
520 command.ts_init() + latency_model.get_delete_latency()
521 }
522 _ => panic!("Cannot handle command: {command:?}"),
523 };
524
525 let counter = self
526 .inflight_counter
527 .entry(ts)
528 .and_modify(|e| *e += 1)
529 .or_insert(1);
530
531 (ts, *counter)
532 } else {
533 panic!("Latency model should be initialized");
534 }
535 }
536
537 pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
541 for module in &self.modules {
542 module.pre_process(Data::Delta(delta));
543 }
544
545 if !self.matching_engines.contains_key(&delta.instrument_id) {
546 let instrument = {
547 let cache = self.cache.as_ref().borrow();
548 cache.instrument(&delta.instrument_id).cloned()
549 };
550
551 if let Some(instrument) = instrument {
552 self.add_instrument(instrument).unwrap();
553 } else {
554 panic!(
555 "No matching engine found for instrument {}",
556 delta.instrument_id
557 );
558 }
559 }
560
561 if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
562 matching_engine.process_order_book_delta(&delta).unwrap();
563 } else {
564 panic!("Matching engine should be initialized");
565 }
566 }
567
568 pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
572 for module in &self.modules {
573 module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
574 }
575
576 if !self.matching_engines.contains_key(&deltas.instrument_id) {
577 let instrument = {
578 let cache = self.cache.as_ref().borrow();
579 cache.instrument(&deltas.instrument_id).cloned()
580 };
581
582 if let Some(instrument) = instrument {
583 self.add_instrument(instrument).unwrap();
584 } else {
585 panic!(
586 "No matching engine found for instrument {}",
587 deltas.instrument_id
588 );
589 }
590 }
591
592 if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
593 matching_engine.process_order_book_deltas(&deltas).unwrap();
594 } else {
595 panic!("Matching engine should be initialized");
596 }
597 }
598
599 pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) {
603 for module in &self.modules {
604 module.pre_process(Data::Depth10(Box::new(*depth)));
605 }
606
607 if !self.matching_engines.contains_key(&depth.instrument_id) {
608 let instrument = {
609 let cache = self.cache.as_ref().borrow();
610 cache.instrument(&depth.instrument_id).cloned()
611 };
612
613 if let Some(instrument) = instrument {
614 self.add_instrument(instrument).unwrap();
615 } else {
616 panic!(
617 "No matching engine found for instrument {}",
618 depth.instrument_id
619 );
620 }
621 }
622
623 if let Some(matching_engine) = self.matching_engines.get_mut(&depth.instrument_id) {
624 matching_engine.process_order_book_depth10(depth).unwrap();
625 } else {
626 panic!("Matching engine should be initialized");
627 }
628 }
629
630 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
634 for module in &self.modules {
635 module.pre_process(Data::Quote(quote.to_owned()));
636 }
637
638 if !self.matching_engines.contains_key("e.instrument_id) {
639 let instrument = {
640 let cache = self.cache.as_ref().borrow();
641 cache.instrument("e.instrument_id).cloned()
642 };
643
644 if let Some(instrument) = instrument {
645 self.add_instrument(instrument).unwrap();
646 } else {
647 panic!(
648 "No matching engine found for instrument {}",
649 quote.instrument_id
650 );
651 }
652 }
653
654 if let Some(matching_engine) = self.matching_engines.get_mut("e.instrument_id) {
655 matching_engine.process_quote_tick(quote);
656 } else {
657 panic!("Matching engine should be initialized");
658 }
659 }
660
661 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
665 for module in &self.modules {
666 module.pre_process(Data::Trade(trade.to_owned()));
667 }
668
669 if !self.matching_engines.contains_key(&trade.instrument_id) {
670 let instrument = {
671 let cache = self.cache.as_ref().borrow();
672 cache.instrument(&trade.instrument_id).cloned()
673 };
674
675 if let Some(instrument) = instrument {
676 self.add_instrument(instrument).unwrap();
677 } else {
678 panic!(
679 "No matching engine found for instrument {}",
680 trade.instrument_id
681 );
682 }
683 }
684
685 if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
686 matching_engine.process_trade_tick(trade);
687 } else {
688 panic!("Matching engine should be initialized");
689 }
690 }
691
692 pub fn process_bar(&mut self, bar: Bar) {
696 for module in &self.modules {
697 module.pre_process(Data::Bar(bar));
698 }
699
700 if !self.matching_engines.contains_key(&bar.instrument_id()) {
701 let instrument = {
702 let cache = self.cache.as_ref().borrow();
703 cache.instrument(&bar.instrument_id()).cloned()
704 };
705
706 if let Some(instrument) = instrument {
707 self.add_instrument(instrument).unwrap();
708 } else {
709 panic!(
710 "No matching engine found for instrument {}",
711 bar.instrument_id()
712 );
713 }
714 }
715
716 if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
717 matching_engine.process_bar(&bar);
718 } else {
719 panic!("Matching engine should be initialized");
720 }
721 }
722
723 pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
727 if !self.matching_engines.contains_key(&status.instrument_id) {
730 let instrument = {
731 let cache = self.cache.as_ref().borrow();
732 cache.instrument(&status.instrument_id).cloned()
733 };
734
735 if let Some(instrument) = instrument {
736 self.add_instrument(instrument).unwrap();
737 } else {
738 panic!(
739 "No matching engine found for instrument {}",
740 status.instrument_id
741 );
742 }
743 }
744
745 if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
746 matching_engine.process_status(status.action);
747 } else {
748 panic!("Matching engine should be initialized");
749 }
750 }
751
752 pub fn process_instrument_close(&mut self, close: InstrumentClose) {
756 for module in &self.modules {
757 module.pre_process(Data::InstrumentClose(close));
758 }
759
760 if !self.matching_engines.contains_key(&close.instrument_id) {
761 let instrument = {
762 let cache = self.cache.as_ref().borrow();
763 cache.instrument(&close.instrument_id).cloned()
764 };
765
766 if let Some(instrument) = instrument {
767 self.add_instrument(instrument).unwrap();
768 } else {
769 panic!(
770 "No matching engine found for instrument {}",
771 close.instrument_id
772 );
773 }
774 }
775
776 if let Some(matching_engine) = self.matching_engines.get_mut(&close.instrument_id) {
777 matching_engine.process_instrument_close(close);
778 } else {
779 panic!("Matching engine should be initialized");
780 }
781 }
782
783 pub fn process(&mut self, ts_now: UnixNanos) {
787 while let Some(inflight) = self.inflight_queue.peek() {
791 if inflight.timestamp > ts_now {
792 break;
794 }
795 let inflight = self.inflight_queue.pop().unwrap();
797 self.process_trading_command(inflight.command);
798 }
799
800 while let Some(command) = self.message_queue.pop_front() {
802 self.process_trading_command(command);
803 }
804 }
805
806 pub fn reset(&mut self) {
807 for module in &self.modules {
808 module.reset();
809 }
810
811 self.generate_fresh_account_state();
812
813 for matching_engine in self.matching_engines.values_mut() {
814 matching_engine.reset();
815 }
816
817 self.message_queue.clear();
818 self.inflight_queue.clear();
819
820 log::info!("Resetting exchange state");
821 }
822
823 pub fn process_trading_command(&mut self, command: TradingCommand) {
827 if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
828 let account_id = if let Some(exec_client) = &self.exec_client {
829 exec_client.account_id()
830 } else {
831 panic!("Execution client should be initialized");
832 };
833 match command {
834 TradingCommand::SubmitOrder(command) => {
835 let mut order = self
836 .cache
837 .borrow()
838 .order(&command.client_order_id)
839 .cloned()
840 .expect("Order must exist in cache");
841 matching_engine.process_order(&mut order, account_id);
842 }
843 TradingCommand::ModifyOrder(ref command) => {
844 matching_engine.process_modify(command, account_id);
845 }
846 TradingCommand::CancelOrder(ref command) => {
847 matching_engine.process_cancel(command, account_id);
848 }
849 TradingCommand::CancelAllOrders(ref command) => {
850 matching_engine.process_cancel_all(command, account_id);
851 }
852 TradingCommand::BatchCancelOrders(ref command) => {
853 matching_engine.process_batch_cancel(command, account_id);
854 }
855 TradingCommand::SubmitOrderList(ref command) => {
856 let mut orders: Vec<OrderAny> = self
857 .cache
858 .borrow()
859 .orders_for_ids(&command.order_list.client_order_ids, command);
860
861 for order in &mut orders {
862 matching_engine.process_order(order, account_id);
863 }
864 }
865 _ => {}
866 }
867 } else {
868 panic!(
869 "Matching engine not found for instrument {}",
870 command.instrument_id()
871 );
872 }
873 }
874
875 pub fn generate_fresh_account_state(&self) {
879 let balances: Vec<AccountBalance> = self
880 .starting_balances
881 .iter()
882 .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
883 .collect();
884
885 if let Some(exec_client) = &self.exec_client {
886 exec_client
887 .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
888 .unwrap();
889 }
890
891 if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
893 margin_account.set_default_leverage(self.default_leverage);
894
895 for (instrument_id, leverage) in &self.leverages {
897 margin_account.set_leverage(*instrument_id, *leverage);
898 }
899 }
900 }
901}
902
903#[cfg(test)]
904mod tests {
905 use std::{cell::RefCell, collections::BinaryHeap, rc::Rc};
906
907 use ahash::AHashMap;
908 use nautilus_common::{
909 cache::Cache,
910 clock::TestClock,
911 messages::execution::{SubmitOrder, TradingCommand},
912 msgbus::{self, stubs::get_typed_message_saving_handler},
913 };
914 use nautilus_core::{UUID4, UnixNanos};
915 use nautilus_execution::models::{
916 fee::{FeeModelAny, MakerTakerFeeModel},
917 fill::FillModelAny,
918 latency::StaticLatencyModel,
919 };
920 use nautilus_model::{
921 accounts::{AccountAny, MarginAccount},
922 data::{
923 Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
924 TradeTick,
925 },
926 enums::{
927 AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
928 OmsType, OrderSide, OrderType,
929 },
930 events::AccountState,
931 identifiers::{
932 AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
933 },
934 instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
935 orders::{Order, OrderAny, OrderTestBuilder},
936 stubs::TestDefault,
937 types::{AccountBalance, Currency, Money, Price, Quantity},
938 };
939 use rstest::rstest;
940
941 use crate::{
942 exchange::{InflightCommand, SimulatedExchange},
943 execution_client::BacktestExecutionClient,
944 };
945
946 fn get_exchange(
947 venue: Venue,
948 account_type: AccountType,
949 book_type: BookType,
950 cache: Option<Rc<RefCell<Cache>>>,
951 ) -> Rc<RefCell<SimulatedExchange>> {
952 let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
953 let clock = Rc::new(RefCell::new(TestClock::new()));
954 let exchange = Rc::new(RefCell::new(
955 SimulatedExchange::new(
956 venue,
957 OmsType::Netting,
958 account_type,
959 vec![Money::new(1000.0, Currency::USD())],
960 None,
961 1.into(),
962 AHashMap::new(),
963 vec![],
964 cache.clone(),
965 clock,
966 FillModelAny::default(),
967 FeeModelAny::MakerTaker(MakerTakerFeeModel),
968 book_type,
969 None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, )
986 .unwrap(),
987 ));
988
989 let clock = TestClock::new();
990 let execution_client = BacktestExecutionClient::new(
991 TraderId::test_default(),
992 AccountId::test_default(),
993 exchange.clone(),
994 cache,
995 Rc::new(RefCell::new(clock)),
996 None,
997 None,
998 );
999 exchange
1000 .borrow_mut()
1001 .register_client(Rc::new(execution_client));
1002
1003 exchange
1004 }
1005
1006 fn create_submit_order_command(
1007 ts_init: UnixNanos,
1008 client_order_id: &str,
1009 ) -> (OrderAny, TradingCommand) {
1010 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
1011 let order = OrderTestBuilder::new(OrderType::Market)
1012 .instrument_id(instrument_id)
1013 .client_order_id(ClientOrderId::new(client_order_id))
1014 .quantity(Quantity::from(1))
1015 .build();
1016 let command = TradingCommand::SubmitOrder(SubmitOrder::new(
1017 TraderId::test_default(),
1018 None,
1019 StrategyId::test_default(),
1020 instrument_id,
1021 order.client_order_id(),
1022 order.init_event().clone(),
1023 None,
1024 None,
1025 None, UUID4::default(),
1027 ts_init,
1028 ));
1029 (order, command)
1030 }
1031
1032 #[rstest]
1033 #[should_panic(
1034 expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
1035 )]
1036 fn test_venue_mismatch_between_exchange_and_instrument(
1037 crypto_perpetual_ethusdt: CryptoPerpetual,
1038 ) {
1039 let exchange = get_exchange(
1040 Venue::new("SIM"),
1041 AccountType::Margin,
1042 BookType::L1_MBP,
1043 None,
1044 );
1045 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1046 exchange.borrow_mut().add_instrument(instrument).unwrap();
1047 }
1048
1049 #[rstest]
1050 #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
1051 fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
1052 let exchange = get_exchange(
1053 Venue::new("BINANCE"),
1054 AccountType::Cash,
1055 BookType::L1_MBP,
1056 None,
1057 );
1058 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1059 exchange.borrow_mut().add_instrument(instrument).unwrap();
1060 }
1061
1062 #[rstest]
1063 fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
1064 let exchange = get_exchange(
1065 Venue::new("BINANCE"),
1066 AccountType::Margin,
1067 BookType::L1_MBP,
1068 None,
1069 );
1070 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1071
1072 exchange.borrow_mut().add_instrument(instrument).unwrap();
1074
1075 let quote_tick = QuoteTick::new(
1077 crypto_perpetual_ethusdt.id,
1078 Price::from("1000.00"),
1079 Price::from("1001.00"),
1080 Quantity::from("1.000"),
1081 Quantity::from("1.000"),
1082 UnixNanos::default(),
1083 UnixNanos::default(),
1084 );
1085 exchange.borrow_mut().process_quote_tick("e_tick);
1086
1087 let best_bid_price = exchange
1088 .borrow()
1089 .best_bid_price(crypto_perpetual_ethusdt.id);
1090 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1091 let best_ask_price = exchange
1092 .borrow()
1093 .best_ask_price(crypto_perpetual_ethusdt.id);
1094 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1095 }
1096
1097 #[rstest]
1098 fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
1099 let exchange = get_exchange(
1100 Venue::new("BINANCE"),
1101 AccountType::Margin,
1102 BookType::L1_MBP,
1103 None,
1104 );
1105 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1106
1107 exchange.borrow_mut().add_instrument(instrument).unwrap();
1109
1110 let trade_tick = TradeTick::new(
1112 crypto_perpetual_ethusdt.id,
1113 Price::from("1000.00"),
1114 Quantity::from("1.000"),
1115 AggressorSide::Buyer,
1116 TradeId::from("1"),
1117 UnixNanos::default(),
1118 UnixNanos::default(),
1119 );
1120 exchange.borrow_mut().process_trade_tick(&trade_tick);
1121
1122 let best_bid_price = exchange
1123 .borrow()
1124 .best_bid_price(crypto_perpetual_ethusdt.id);
1125 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1126 let best_ask = exchange
1127 .borrow()
1128 .best_ask_price(crypto_perpetual_ethusdt.id);
1129 assert_eq!(best_ask, Some(Price::from("1000.00")));
1130 }
1131
1132 #[rstest]
1133 fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1134 let exchange = get_exchange(
1135 Venue::new("BINANCE"),
1136 AccountType::Margin,
1137 BookType::L1_MBP,
1138 None,
1139 );
1140 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1141
1142 exchange.borrow_mut().add_instrument(instrument).unwrap();
1144
1145 let bar = Bar::new(
1147 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1148 Price::from("1500.00"),
1149 Price::from("1505.00"),
1150 Price::from("1490.00"),
1151 Price::from("1502.00"),
1152 Quantity::from("100.000"),
1153 UnixNanos::default(),
1154 UnixNanos::default(),
1155 );
1156 exchange.borrow_mut().process_bar(bar);
1157
1158 let best_bid_price = exchange
1160 .borrow()
1161 .best_bid_price(crypto_perpetual_ethusdt.id);
1162 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1163 let best_ask_price = exchange
1164 .borrow()
1165 .best_ask_price(crypto_perpetual_ethusdt.id);
1166 assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1167 }
1168
1169 #[rstest]
1170 fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1171 let exchange = get_exchange(
1172 Venue::new("BINANCE"),
1173 AccountType::Margin,
1174 BookType::L1_MBP,
1175 None,
1176 );
1177 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1178
1179 exchange.borrow_mut().add_instrument(instrument).unwrap();
1181
1182 let bar_bid = Bar::new(
1185 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1186 Price::from("1500.00"),
1187 Price::from("1505.00"),
1188 Price::from("1490.00"),
1189 Price::from("1502.00"),
1190 Quantity::from("100.000"),
1191 UnixNanos::from(1),
1192 UnixNanos::from(1),
1193 );
1194 let bar_ask = Bar::new(
1195 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1196 Price::from("1501.00"),
1197 Price::from("1506.00"),
1198 Price::from("1491.00"),
1199 Price::from("1503.00"),
1200 Quantity::from("100.000"),
1201 UnixNanos::from(1),
1202 UnixNanos::from(1),
1203 );
1204
1205 exchange.borrow_mut().process_bar(bar_bid);
1207 exchange.borrow_mut().process_bar(bar_ask);
1208
1209 let best_bid_price = exchange
1211 .borrow()
1212 .best_bid_price(crypto_perpetual_ethusdt.id);
1213 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1214 let best_ask_price = exchange
1215 .borrow()
1216 .best_ask_price(crypto_perpetual_ethusdt.id);
1217 assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1218 }
1219
1220 #[rstest]
1221 fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1222 let exchange = get_exchange(
1223 Venue::new("BINANCE"),
1224 AccountType::Margin,
1225 BookType::L2_MBP,
1226 None,
1227 );
1228 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1229
1230 exchange.borrow_mut().add_instrument(instrument).unwrap();
1232
1233 let delta_buy = OrderBookDelta::new(
1235 crypto_perpetual_ethusdt.id,
1236 BookAction::Add,
1237 BookOrder::new(
1238 OrderSide::Buy,
1239 Price::from("1000.00"),
1240 Quantity::from("1.000"),
1241 1,
1242 ),
1243 0,
1244 0,
1245 UnixNanos::from(1),
1246 UnixNanos::from(1),
1247 );
1248 let delta_sell = OrderBookDelta::new(
1249 crypto_perpetual_ethusdt.id,
1250 BookAction::Add,
1251 BookOrder::new(
1252 OrderSide::Sell,
1253 Price::from("1001.00"),
1254 Quantity::from("1.000"),
1255 1,
1256 ),
1257 0,
1258 1,
1259 UnixNanos::from(2),
1260 UnixNanos::from(2),
1261 );
1262
1263 exchange.borrow_mut().process_order_book_delta(delta_buy);
1265 exchange.borrow_mut().process_order_book_delta(delta_sell);
1266
1267 let book = exchange
1268 .borrow()
1269 .get_book(crypto_perpetual_ethusdt.id)
1270 .unwrap()
1271 .clone();
1272 assert_eq!(book.update_count, 2);
1273 assert_eq!(book.sequence, 1);
1274 assert_eq!(book.ts_last, UnixNanos::from(2));
1275 let best_bid_price = exchange
1276 .borrow()
1277 .best_bid_price(crypto_perpetual_ethusdt.id);
1278 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1279 let best_ask_price = exchange
1280 .borrow()
1281 .best_ask_price(crypto_perpetual_ethusdt.id);
1282 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1283 }
1284
1285 #[rstest]
1286 fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1287 let exchange = get_exchange(
1288 Venue::new("BINANCE"),
1289 AccountType::Margin,
1290 BookType::L2_MBP,
1291 None,
1292 );
1293 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1294
1295 exchange.borrow_mut().add_instrument(instrument).unwrap();
1297
1298 let delta_sell_1 = OrderBookDelta::new(
1300 crypto_perpetual_ethusdt.id,
1301 BookAction::Add,
1302 BookOrder::new(
1303 OrderSide::Sell,
1304 Price::from("1000.00"),
1305 Quantity::from("3.000"),
1306 1,
1307 ),
1308 0,
1309 0,
1310 UnixNanos::from(1),
1311 UnixNanos::from(1),
1312 );
1313 let delta_sell_2 = OrderBookDelta::new(
1314 crypto_perpetual_ethusdt.id,
1315 BookAction::Add,
1316 BookOrder::new(
1317 OrderSide::Sell,
1318 Price::from("1001.00"),
1319 Quantity::from("1.000"),
1320 1,
1321 ),
1322 0,
1323 1,
1324 UnixNanos::from(1),
1325 UnixNanos::from(1),
1326 );
1327 let orderbook_deltas = OrderBookDeltas::new(
1328 crypto_perpetual_ethusdt.id,
1329 vec![delta_sell_1, delta_sell_2],
1330 );
1331
1332 exchange
1334 .borrow_mut()
1335 .process_order_book_deltas(orderbook_deltas);
1336
1337 let book = exchange
1338 .borrow()
1339 .get_book(crypto_perpetual_ethusdt.id)
1340 .unwrap()
1341 .clone();
1342 assert_eq!(book.update_count, 2);
1343 assert_eq!(book.sequence, 1);
1344 assert_eq!(book.ts_last, UnixNanos::from(1));
1345 let best_bid_price = exchange
1346 .borrow()
1347 .best_bid_price(crypto_perpetual_ethusdt.id);
1348 assert_eq!(best_bid_price, None);
1350 let best_ask_price = exchange
1351 .borrow()
1352 .best_ask_price(crypto_perpetual_ethusdt.id);
1353 assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1355 }
1356
1357 #[rstest]
1358 fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1359 let exchange = get_exchange(
1360 Venue::new("BINANCE"),
1361 AccountType::Margin,
1362 BookType::L2_MBP,
1363 None,
1364 );
1365 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1366
1367 exchange.borrow_mut().add_instrument(instrument).unwrap();
1369
1370 let instrument_status = InstrumentStatus::new(
1371 crypto_perpetual_ethusdt.id,
1372 MarketStatusAction::Close, UnixNanos::from(1),
1374 UnixNanos::from(1),
1375 None,
1376 None,
1377 None,
1378 None,
1379 None,
1380 );
1381
1382 exchange
1383 .borrow_mut()
1384 .process_instrument_status(instrument_status);
1385
1386 let market_status = exchange
1387 .borrow()
1388 .get_matching_engine(&crypto_perpetual_ethusdt.id)
1389 .unwrap()
1390 .market_status;
1391 assert_eq!(market_status, MarketStatus::Closed);
1392 }
1393
1394 #[rstest]
1395 fn test_accounting() {
1396 let account_type = AccountType::Margin;
1397 let mut cache = Cache::default();
1398 let (handler, saving_handler) = get_typed_message_saving_handler::<AccountState>(None);
1399 msgbus::register_account_state_endpoint("Portfolio.update_account".into(), handler);
1400 let margin_account = MarginAccount::new(
1401 AccountState::new(
1402 AccountId::from("SIM-001"),
1403 account_type,
1404 vec![AccountBalance::new(
1405 Money::from("1000 USD"),
1406 Money::from("0 USD"),
1407 Money::from("1000 USD"),
1408 )],
1409 vec![],
1410 false,
1411 UUID4::default(),
1412 UnixNanos::default(),
1413 UnixNanos::default(),
1414 None,
1415 ),
1416 false,
1417 );
1418 let () = cache
1419 .add_account(AccountAny::Margin(margin_account))
1420 .unwrap();
1421 cache.build_index();
1423
1424 let exchange = get_exchange(
1425 Venue::new("SIM"),
1426 account_type,
1427 BookType::L2_MBP,
1428 Some(Rc::new(RefCell::new(cache))),
1429 );
1430 exchange.borrow_mut().initialize_account();
1431
1432 exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1434
1435 let messages = saving_handler.get_messages();
1437 assert_eq!(messages.len(), 2);
1438 let account_state_first = messages.first().unwrap();
1439 let account_state_second = messages.last().unwrap();
1440
1441 assert_eq!(account_state_first.balances.len(), 1);
1442 let current_balance = account_state_first.balances[0];
1443 assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1444 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1445 assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1446
1447 assert_eq!(account_state_second.balances.len(), 1);
1448 let current_balance = account_state_second.balances[0];
1449 assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1450 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1451 assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1452 }
1453
1454 #[rstest]
1455 fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1456 let (_, cmd1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1458 let (_, cmd2) = create_submit_order_command(UnixNanos::from(200), "O-2");
1459 let (_, cmd3) = create_submit_order_command(UnixNanos::from(100), "O-3");
1460
1461 let inflight1 = InflightCommand::new(UnixNanos::from(100), 1, cmd1);
1462 let inflight2 = InflightCommand::new(UnixNanos::from(200), 2, cmd2);
1463 let inflight3 = InflightCommand::new(UnixNanos::from(100), 2, cmd3);
1464
1465 let mut inflight_heap = BinaryHeap::new();
1467 inflight_heap.push(inflight1);
1468 inflight_heap.push(inflight2);
1469 inflight_heap.push(inflight3);
1470
1471 let first = inflight_heap.pop().unwrap();
1474 let second = inflight_heap.pop().unwrap();
1475 let third = inflight_heap.pop().unwrap();
1476
1477 assert_eq!(first.timestamp, UnixNanos::from(100));
1478 assert_eq!(first.counter, 1);
1479 assert_eq!(second.timestamp, UnixNanos::from(100));
1480 assert_eq!(second.counter, 2);
1481 assert_eq!(third.timestamp, UnixNanos::from(200));
1482 assert_eq!(third.counter, 2);
1483 }
1484
1485 #[rstest]
1486 fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1487 let exchange = get_exchange(
1488 Venue::new("BINANCE"),
1489 AccountType::Margin,
1490 BookType::L2_MBP,
1491 None,
1492 );
1493
1494 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1495 exchange.borrow_mut().add_instrument(instrument).unwrap();
1496
1497 let (order1, command1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1498 let (order2, command2) = create_submit_order_command(UnixNanos::from(200), "O-2");
1499
1500 exchange
1501 .borrow()
1502 .cache()
1503 .borrow_mut()
1504 .add_order(order1, None, None, false)
1505 .unwrap();
1506 exchange
1507 .borrow()
1508 .cache()
1509 .borrow_mut()
1510 .add_order(order2, None, None, false)
1511 .unwrap();
1512
1513 exchange.borrow_mut().send(command1);
1514 exchange.borrow_mut().send(command2);
1515
1516 assert_eq!(exchange.borrow().message_queue.len(), 2);
1519 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1520
1521 exchange.borrow_mut().process(UnixNanos::from(300));
1523 assert_eq!(exchange.borrow().message_queue.len(), 0);
1524 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1525 }
1526
1527 #[rstest]
1528 fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1529 let latency_model = StaticLatencyModel::new(
1532 UnixNanos::from(100),
1533 UnixNanos::from(200),
1534 UnixNanos::from(300),
1535 UnixNanos::from(100),
1536 );
1537 let exchange = get_exchange(
1538 Venue::new("BINANCE"),
1539 AccountType::Margin,
1540 BookType::L2_MBP,
1541 None,
1542 );
1543 exchange
1544 .borrow_mut()
1545 .set_latency_model(Box::new(latency_model));
1546
1547 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1548 exchange.borrow_mut().add_instrument(instrument).unwrap();
1549
1550 let (order1, command1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1551 let (order2, command2) = create_submit_order_command(UnixNanos::from(150), "O-2");
1552
1553 exchange
1554 .borrow()
1555 .cache()
1556 .borrow_mut()
1557 .add_order(order1, None, None, false)
1558 .unwrap();
1559 exchange
1560 .borrow()
1561 .cache()
1562 .borrow_mut()
1563 .add_order(order2, None, None, false)
1564 .unwrap();
1565
1566 exchange.borrow_mut().send(command1);
1567 exchange.borrow_mut().send(command2);
1568
1569 assert_eq!(exchange.borrow().message_queue.len(), 0);
1571 assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1572 assert_eq!(
1574 exchange
1575 .borrow()
1576 .inflight_queue
1577 .iter()
1578 .next()
1579 .unwrap()
1580 .timestamp,
1581 UnixNanos::from(400)
1582 );
1583 assert_eq!(
1585 exchange
1586 .borrow()
1587 .inflight_queue
1588 .iter()
1589 .nth(1)
1590 .unwrap()
1591 .timestamp,
1592 UnixNanos::from(450)
1593 );
1594
1595 exchange.borrow_mut().process(UnixNanos::from(420));
1597 assert_eq!(exchange.borrow().message_queue.len(), 0);
1598 assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1599 assert_eq!(
1600 exchange
1601 .borrow()
1602 .inflight_queue
1603 .iter()
1604 .next()
1605 .unwrap()
1606 .timestamp,
1607 UnixNanos::from(450)
1608 );
1609 }
1610
1611 #[rstest]
1612 fn test_process_iterates_matching_engines_after_commands(
1613 crypto_perpetual_ethusdt: CryptoPerpetual,
1614 ) {
1615 let cache = Rc::new(RefCell::new(Cache::default()));
1616 let exchange = get_exchange(
1617 Venue::new("BINANCE"),
1618 AccountType::Margin,
1619 BookType::L1_MBP,
1620 Some(cache.clone()),
1621 );
1622 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1623 let instrument_id = crypto_perpetual_ethusdt.id;
1624 exchange.borrow_mut().add_instrument(instrument).unwrap();
1625
1626 let quote = QuoteTick::new(
1627 instrument_id,
1628 Price::from("1000.00"),
1629 Price::from("1001.00"),
1630 Quantity::from("1.000"),
1631 Quantity::from("1.000"),
1632 UnixNanos::from(1),
1633 UnixNanos::from(1),
1634 );
1635 exchange.borrow_mut().process_quote_tick("e);
1636
1637 let order = OrderTestBuilder::new(OrderType::Limit)
1639 .instrument_id(instrument_id)
1640 .client_order_id(ClientOrderId::new("O-LIMIT-1"))
1641 .side(OrderSide::Buy)
1642 .quantity(Quantity::from("1.000"))
1643 .price(Price::from("999.00"))
1644 .build();
1645
1646 cache
1647 .borrow_mut()
1648 .add_order(order.clone(), None, None, false)
1649 .unwrap();
1650
1651 let command = TradingCommand::SubmitOrder(SubmitOrder::new(
1652 TraderId::test_default(),
1653 None,
1654 StrategyId::test_default(),
1655 instrument_id,
1656 order.client_order_id(),
1657 order.init_event().clone(),
1658 None,
1659 None,
1660 None,
1661 UUID4::default(),
1662 UnixNanos::from(1),
1663 ));
1664 exchange.borrow_mut().send(command);
1665
1666 exchange.borrow_mut().process(UnixNanos::from(1));
1667
1668 let open_orders = exchange.borrow().get_open_orders(Some(instrument_id));
1669 assert_eq!(open_orders.len(), 1);
1670 assert_eq!(
1671 open_orders[0].client_order_id,
1672 ClientOrderId::new("O-LIMIT-1")
1673 );
1674 }
1675}