1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::AHashMap;
21use async_trait::async_trait;
22use nautilus_common::{
23 cache::Cache,
24 clients::ExecutionClient,
25 clock::Clock,
26 factories::OrderEventFactory,
27 messages::execution::{
28 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
29 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
30 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
31 },
32 msgbus::{self, MStr, MessagingSwitchboard, Pattern, TypedHandler},
33};
34use nautilus_core::{UnixNanos, WeakCell};
35use nautilus_execution::{
36 client::core::ExecutionClientCore,
37 matching_engine::adapter::OrderEngineAdapter,
38 models::{
39 fee::{FeeModelAny, MakerTakerFeeModel},
40 fill::FillModelAny,
41 },
42};
43use nautilus_model::{
44 accounts::AccountAny,
45 data::{Bar, OrderBookDeltas, QuoteTick, TradeTick},
46 enums::OmsType,
47 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Venue},
48 instruments::{Instrument, InstrumentAny},
49 orders::{Order, OrderAny},
50 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51 types::{AccountBalance, MarginBalance, Money},
52};
53
54use crate::config::SandboxExecutionClientConfig;
55
56struct SandboxInner {
60 clock: Rc<RefCell<dyn Clock>>,
62 cache: Rc<RefCell<Cache>>,
64 config: SandboxExecutionClientConfig,
66 matching_engines: AHashMap<InstrumentId, OrderEngineAdapter>,
68 next_engine_raw_id: u32,
70 balances: AHashMap<String, Money>,
72}
73
74impl SandboxInner {
75 fn ensure_matching_engine(&mut self, instrument: &InstrumentAny) {
77 let instrument_id = instrument.id();
78
79 if !self.matching_engines.contains_key(&instrument_id) {
80 let engine_config = self.config.to_matching_engine_config();
81 let fill_model = FillModelAny::default();
82 let fee_model = FeeModelAny::MakerTaker(MakerTakerFeeModel);
83 let raw_id = self.next_engine_raw_id;
84 self.next_engine_raw_id = self.next_engine_raw_id.wrapping_add(1);
85
86 let engine = OrderEngineAdapter::new(
87 instrument.clone(),
88 raw_id,
89 fill_model,
90 fee_model,
91 self.config.book_type,
92 self.config.oms_type,
93 self.config.account_type,
94 self.clock.clone(),
95 self.cache.clone(),
96 engine_config,
97 );
98
99 self.matching_engines.insert(instrument_id, engine);
100 }
101 }
102
103 fn process_quote_tick(&mut self, quote: &QuoteTick) {
105 let instrument_id = quote.instrument_id;
106
107 let instrument = self.cache.borrow().instrument(&instrument_id).cloned();
109 if let Some(instrument) = instrument {
110 self.ensure_matching_engine(&instrument);
111 if let Some(engine) = self.matching_engines.get_mut(&instrument_id) {
112 engine.get_engine_mut().process_quote_tick(quote);
113 }
114 }
115 }
116
117 fn process_trade_tick(&mut self, trade: &TradeTick) {
119 if !self.config.trade_execution {
120 return;
121 }
122
123 let instrument_id = trade.instrument_id;
124
125 let instrument = self.cache.borrow().instrument(&instrument_id).cloned();
126 if let Some(instrument) = instrument {
127 self.ensure_matching_engine(&instrument);
128 if let Some(engine) = self.matching_engines.get_mut(&instrument_id) {
129 engine.get_engine_mut().process_trade_tick(trade);
130 }
131 }
132 }
133
134 fn process_bar(&mut self, bar: &Bar) {
136 if !self.config.bar_execution {
137 return;
138 }
139
140 let instrument_id = bar.bar_type.instrument_id();
141
142 let instrument = self.cache.borrow().instrument(&instrument_id).cloned();
143 if let Some(instrument) = instrument {
144 self.ensure_matching_engine(&instrument);
145 if let Some(engine) = self.matching_engines.get_mut(&instrument_id) {
146 engine.get_engine_mut().process_bar(bar);
147 }
148 }
149 }
150}
151
152struct RegisteredHandlers {
154 quote_pattern: MStr<Pattern>,
155 quote_handler: TypedHandler<QuoteTick>,
156 trade_pattern: MStr<Pattern>,
157 trade_handler: TypedHandler<TradeTick>,
158 bar_pattern: MStr<Pattern>,
159 bar_handler: TypedHandler<Bar>,
160}
161
162pub struct SandboxExecutionClient {
168 core: RefCell<ExecutionClientCore>,
170 factory: OrderEventFactory,
172 config: SandboxExecutionClientConfig,
174 inner: Rc<RefCell<SandboxInner>>,
176 handlers: RefCell<Option<RegisteredHandlers>>,
178 clock: Rc<RefCell<dyn Clock>>,
180 cache: Rc<RefCell<Cache>>,
182}
183
184impl Debug for SandboxExecutionClient {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 f.debug_struct(stringify!(SandboxExecutionClient))
187 .field("venue", &self.config.venue)
188 .field("account_id", &self.core.borrow().account_id)
189 .field("connected", &self.core.borrow().is_connected())
190 .field(
191 "matching_engines",
192 &self.inner.borrow().matching_engines.len(),
193 )
194 .finish()
195 }
196}
197
198impl SandboxExecutionClient {
199 #[must_use]
201 pub fn new(
202 core: ExecutionClientCore,
203 config: SandboxExecutionClientConfig,
204 clock: Rc<RefCell<dyn Clock>>,
205 cache: Rc<RefCell<Cache>>,
206 ) -> Self {
207 let mut balances = AHashMap::new();
208 for money in &config.starting_balances {
209 balances.insert(money.currency.code.to_string(), *money);
210 }
211
212 let inner = Rc::new(RefCell::new(SandboxInner {
213 clock: clock.clone(),
214 cache: cache.clone(),
215 config: config.clone(),
216 matching_engines: AHashMap::new(),
217 next_engine_raw_id: 0,
218 balances,
219 }));
220
221 let factory = OrderEventFactory::new(
222 core.trader_id,
223 core.account_id,
224 core.account_type,
225 core.base_currency,
226 );
227
228 Self {
229 core: RefCell::new(core),
230 factory,
231 config,
232 inner,
233 handlers: RefCell::new(None),
234 clock,
235 cache,
236 }
237 }
238
239 #[must_use]
241 pub const fn config(&self) -> &SandboxExecutionClientConfig {
242 &self.config
243 }
244
245 #[must_use]
247 pub fn matching_engine_count(&self) -> usize {
248 self.inner.borrow().matching_engines.len()
249 }
250
251 fn register_message_handlers(&self) {
256 if self.handlers.borrow().is_some() {
257 log::warn!("Sandbox message handlers already registered");
258 return;
259 }
260
261 let inner_weak = WeakCell::from(Rc::downgrade(&self.inner));
262 let venue = self.config.venue;
263
264 let quote_handler = {
266 let inner = inner_weak.clone();
267 TypedHandler::from(move |quote: &QuoteTick| {
268 if quote.instrument_id.venue == venue
269 && let Some(inner_rc) = inner.upgrade()
270 {
271 inner_rc.borrow_mut().process_quote_tick(quote);
272 }
273 })
274 };
275
276 let trade_handler = {
278 let inner = inner_weak.clone();
279 TypedHandler::from(move |trade: &TradeTick| {
280 if trade.instrument_id.venue == venue
281 && let Some(inner_rc) = inner.upgrade()
282 {
283 inner_rc.borrow_mut().process_trade_tick(trade);
284 }
285 })
286 };
287
288 let bar_handler = {
290 let inner = inner_weak;
291 TypedHandler::from(move |bar: &Bar| {
292 if bar.bar_type.instrument_id().venue == venue
293 && let Some(inner_rc) = inner.upgrade()
294 {
295 inner_rc.borrow_mut().process_bar(bar);
296 }
297 })
298 };
299
300 let quote_pattern: MStr<Pattern> = format!("data.quotes.{venue}.*").into();
302 let trade_pattern: MStr<Pattern> = format!("data.trades.{venue}.*").into();
303 let bar_pattern: MStr<Pattern> = "data.bars.*".into();
304
305 msgbus::subscribe_quotes(quote_pattern, quote_handler.clone(), Some(10));
306 msgbus::subscribe_trades(trade_pattern, trade_handler.clone(), Some(10));
307 msgbus::subscribe_bars(bar_pattern, bar_handler.clone(), Some(10));
308
309 *self.handlers.borrow_mut() = Some(RegisteredHandlers {
311 quote_pattern,
312 quote_handler,
313 trade_pattern,
314 trade_handler,
315 bar_pattern,
316 bar_handler,
317 });
318
319 log::info!(
320 "Sandbox registered message handlers for venue={}",
321 self.config.venue
322 );
323 }
324
325 fn deregister_message_handlers(&self) {
327 if let Some(handlers) = self.handlers.borrow_mut().take() {
328 msgbus::unsubscribe_quotes(handlers.quote_pattern, &handlers.quote_handler);
329 msgbus::unsubscribe_trades(handlers.trade_pattern, &handlers.trade_handler);
330 msgbus::unsubscribe_bars(handlers.bar_pattern, &handlers.bar_handler);
331
332 log::info!(
333 "Sandbox deregistered message handlers for venue={}",
334 self.config.venue
335 );
336 }
337 }
338
339 fn get_current_account_balances(&self) -> Vec<AccountBalance> {
341 let account_id = self.core.borrow().account_id;
342 let cache = self.cache.borrow();
343
344 if let Some(account) = cache.account(&account_id) {
346 return account.balances().into_values().collect();
347 }
348
349 self.get_account_balances()
351 }
352
353 pub fn process_quote_tick(&self, quote: &QuoteTick) -> anyhow::Result<()> {
359 let instrument_id = quote.instrument_id;
360 let instrument = self
361 .cache
362 .borrow()
363 .instrument(&instrument_id)
364 .cloned()
365 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
366
367 let mut inner = self.inner.borrow_mut();
368 inner.ensure_matching_engine(&instrument);
369 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
370 engine.get_engine_mut().process_quote_tick(quote);
371 }
372 Ok(())
373 }
374
375 pub fn process_trade_tick(&self, trade: &TradeTick) -> anyhow::Result<()> {
381 if !self.config.trade_execution {
382 return Ok(());
383 }
384
385 let instrument_id = trade.instrument_id;
386 let instrument = self
387 .cache
388 .borrow()
389 .instrument(&instrument_id)
390 .cloned()
391 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
392
393 let mut inner = self.inner.borrow_mut();
394 inner.ensure_matching_engine(&instrument);
395 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
396 engine.get_engine_mut().process_trade_tick(trade);
397 }
398 Ok(())
399 }
400
401 pub fn process_bar(&self, bar: &Bar) -> anyhow::Result<()> {
407 if !self.config.bar_execution {
408 return Ok(());
409 }
410
411 let instrument_id = bar.bar_type.instrument_id();
412 let instrument = self
413 .cache
414 .borrow()
415 .instrument(&instrument_id)
416 .cloned()
417 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
418
419 let mut inner = self.inner.borrow_mut();
420 inner.ensure_matching_engine(&instrument);
421 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
422 engine.get_engine_mut().process_bar(bar);
423 }
424 Ok(())
425 }
426
427 pub fn process_order_book_deltas(&self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
433 let instrument_id = deltas.instrument_id;
434 let instrument = self
435 .cache
436 .borrow()
437 .instrument(&instrument_id)
438 .cloned()
439 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
440
441 let mut inner = self.inner.borrow_mut();
442 inner.ensure_matching_engine(&instrument);
443 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
444 engine.get_engine_mut().process_order_book_deltas(deltas)?;
445 }
446 Ok(())
447 }
448
449 pub fn reset(&self) {
451 let mut inner = self.inner.borrow_mut();
452 for engine in inner.matching_engines.values_mut() {
453 engine.get_engine_mut().reset();
454 }
455
456 inner.balances.clear();
457 for money in &self.config.starting_balances {
458 inner
459 .balances
460 .insert(money.currency.code.to_string(), *money);
461 }
462
463 log::info!(
464 "Sandbox execution client reset: venue={}",
465 self.config.venue
466 );
467 }
468
469 fn get_account_balances(&self) -> Vec<AccountBalance> {
471 self.inner
472 .borrow()
473 .balances
474 .values()
475 .map(|money| AccountBalance::new(*money, Money::new(0.0, money.currency), *money))
476 .collect()
477 }
478
479 fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
480 self.cache
481 .borrow()
482 .order(client_order_id)
483 .cloned()
484 .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
485 }
486}
487
488#[async_trait(?Send)]
489impl ExecutionClient for SandboxExecutionClient {
490 fn is_connected(&self) -> bool {
491 self.core.borrow().is_connected()
492 }
493
494 fn client_id(&self) -> ClientId {
495 self.core.borrow().client_id
496 }
497
498 fn account_id(&self) -> AccountId {
499 self.core.borrow().account_id
500 }
501
502 fn venue(&self) -> Venue {
503 self.core.borrow().venue
504 }
505
506 fn oms_type(&self) -> OmsType {
507 self.config.oms_type
508 }
509
510 fn get_account(&self) -> Option<AccountAny> {
511 let account_id = self.core.borrow().account_id;
512 self.cache.borrow().account(&account_id).cloned()
513 }
514
515 fn generate_account_state(
516 &self,
517 balances: Vec<AccountBalance>,
518 margins: Vec<MarginBalance>,
519 reported: bool,
520 ts_event: UnixNanos,
521 ) -> anyhow::Result<()> {
522 let ts_init = self.clock.borrow().timestamp_ns();
523 let state = self
524 .factory
525 .generate_account_state(balances, margins, reported, ts_event, ts_init);
526 let endpoint = MessagingSwitchboard::portfolio_update_account();
527 msgbus::send_account_state(endpoint, &state);
528 Ok(())
529 }
530
531 fn start(&mut self) -> anyhow::Result<()> {
532 if self.core.borrow().is_started() {
533 return Ok(());
534 }
535
536 self.register_message_handlers();
538
539 self.core.borrow().set_started();
540 let core = self.core.borrow();
541 log::info!(
542 "Sandbox execution client started: venue={}, account_id={}, oms_type={:?}, account_type={:?}",
543 self.config.venue,
544 core.account_id,
545 self.config.oms_type,
546 self.config.account_type,
547 );
548 Ok(())
549 }
550
551 fn stop(&mut self) -> anyhow::Result<()> {
552 if self.core.borrow().is_stopped() {
553 return Ok(());
554 }
555
556 self.deregister_message_handlers();
558
559 self.core.borrow().set_stopped();
560 self.core.borrow().set_disconnected();
561 log::info!(
562 "Sandbox execution client stopped: venue={}",
563 self.config.venue
564 );
565 Ok(())
566 }
567
568 async fn connect(&mut self) -> anyhow::Result<()> {
569 if self.core.borrow().is_connected() {
570 return Ok(());
571 }
572
573 let balances = self.get_account_balances();
574 let ts_event = self.clock.borrow().timestamp_ns();
575 self.generate_account_state(balances, vec![], false, ts_event)?;
576
577 self.core.borrow().set_connected();
578 log::info!(
579 "Sandbox execution client connected: venue={}",
580 self.config.venue
581 );
582 Ok(())
583 }
584
585 async fn disconnect(&mut self) -> anyhow::Result<()> {
586 if self.core.borrow().is_disconnected() {
587 return Ok(());
588 }
589
590 self.core.borrow().set_disconnected();
591 log::info!(
592 "Sandbox execution client disconnected: venue={}",
593 self.config.venue
594 );
595 Ok(())
596 }
597
598 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
599 let mut order = self.get_order(&cmd.client_order_id)?;
600
601 if order.is_closed() {
602 log::warn!("Cannot submit closed order {}", order.client_order_id());
603 return Ok(());
604 }
605
606 let ts_init = self.clock.borrow().timestamp_ns();
607 let event = self.factory.generate_order_submitted(&order, ts_init);
608 let endpoint = MessagingSwitchboard::exec_engine_process();
609 msgbus::send_order_event(endpoint, event);
610
611 let instrument_id = order.instrument_id();
612 let instrument = self
613 .cache
614 .borrow()
615 .instrument(&instrument_id)
616 .cloned()
617 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
618
619 let mut inner = self.inner.borrow_mut();
620 inner.ensure_matching_engine(&instrument);
621
622 let cache = self.cache.borrow();
624 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
625 if let Some(quote) = cache.quote(&instrument_id) {
626 engine.get_engine_mut().process_quote_tick(quote);
627 }
628 if self.config.trade_execution
629 && let Some(trade) = cache.trade(&instrument_id)
630 {
631 engine.get_engine_mut().process_trade_tick(trade);
632 }
633 }
634 drop(cache);
635
636 let account_id = self.core.borrow().account_id;
637 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
638 engine
639 .get_engine_mut()
640 .process_order(&mut order, account_id);
641 }
642
643 Ok(())
644 }
645
646 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
647 let ts_init = self.clock.borrow().timestamp_ns();
648 let endpoint = MessagingSwitchboard::exec_engine_process();
649
650 let orders: Vec<OrderAny> = self
651 .cache
652 .borrow()
653 .orders_for_ids(&cmd.order_list.client_order_ids, cmd);
654
655 for order in &orders {
656 if order.is_closed() {
657 log::warn!("Cannot submit closed order {}", order.client_order_id());
658 continue;
659 }
660
661 let event = self.factory.generate_order_submitted(order, ts_init);
662 msgbus::send_order_event(endpoint, event);
663 }
664
665 let account_id = self.core.borrow().account_id;
666 for order in &orders {
667 if order.is_closed() {
668 continue;
669 }
670
671 let instrument_id = order.instrument_id();
672 let instrument = self.cache.borrow().instrument(&instrument_id).cloned();
673
674 if let Some(instrument) = instrument {
675 let mut inner = self.inner.borrow_mut();
676 inner.ensure_matching_engine(&instrument);
677
678 let cache = self.cache.borrow();
680 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
681 if let Some(quote) = cache.quote(&instrument_id) {
682 engine.get_engine_mut().process_quote_tick(quote);
683 }
684 if self.config.trade_execution
685 && let Some(trade) = cache.trade(&instrument_id)
686 {
687 engine.get_engine_mut().process_trade_tick(trade);
688 }
689 }
690 drop(cache);
691
692 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
693 let mut order_clone = order.clone();
694 engine
695 .get_engine_mut()
696 .process_order(&mut order_clone, account_id);
697 }
698 }
699 }
700
701 Ok(())
702 }
703
704 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
705 let instrument_id = cmd.instrument_id;
706 let account_id = self.core.borrow().account_id;
707
708 let mut inner = self.inner.borrow_mut();
709 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
710 engine.get_engine_mut().process_modify(cmd, account_id);
711 }
712 Ok(())
713 }
714
715 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
716 let instrument_id = cmd.instrument_id;
717 let account_id = self.core.borrow().account_id;
718
719 let mut inner = self.inner.borrow_mut();
720 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
721 engine.get_engine_mut().process_cancel(cmd, account_id);
722 }
723 Ok(())
724 }
725
726 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
727 let instrument_id = cmd.instrument_id;
728 let account_id = self.core.borrow().account_id;
729
730 let mut inner = self.inner.borrow_mut();
731 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
732 engine.get_engine_mut().process_cancel_all(cmd, account_id);
733 }
734 Ok(())
735 }
736
737 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
738 let instrument_id = cmd.instrument_id;
739 let account_id = self.core.borrow().account_id;
740
741 let mut inner = self.inner.borrow_mut();
742 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
743 engine
744 .get_engine_mut()
745 .process_batch_cancel(cmd, account_id);
746 }
747 Ok(())
748 }
749
750 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
751 let balances = self.get_current_account_balances();
752 let ts_event = self.clock.borrow().timestamp_ns();
753 self.generate_account_state(balances, vec![], false, ts_event)?;
754 Ok(())
755 }
756
757 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
758 Ok(())
760 }
761
762 async fn generate_order_status_report(
763 &self,
764 _cmd: &GenerateOrderStatusReport,
765 ) -> anyhow::Result<Option<OrderStatusReport>> {
766 Ok(None)
768 }
769
770 async fn generate_order_status_reports(
771 &self,
772 _cmd: &GenerateOrderStatusReports,
773 ) -> anyhow::Result<Vec<OrderStatusReport>> {
774 Ok(Vec::new())
776 }
777
778 async fn generate_fill_reports(
779 &self,
780 _cmd: GenerateFillReports,
781 ) -> anyhow::Result<Vec<FillReport>> {
782 Ok(Vec::new())
784 }
785
786 async fn generate_position_status_reports(
787 &self,
788 _cmd: &GeneratePositionStatusReports,
789 ) -> anyhow::Result<Vec<PositionStatusReport>> {
790 Ok(Vec::new())
792 }
793
794 async fn generate_mass_status(
795 &self,
796 _lookback_mins: Option<u64>,
797 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
798 Ok(None)
800 }
801}