1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::AHashMap;
21use async_trait::async_trait;
22use nautilus_common::{
23 cache::Cache,
24 clients::ExecutionClient,
25 clock::Clock,
26 messages::execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
29 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
30 },
31 msgbus::{self, MStr, Pattern, TypedHandler},
32};
33use nautilus_core::{UnixNanos, WeakCell};
34use nautilus_execution::{
35 client::base::ExecutionClientCore,
36 matching_engine::adapter::OrderEngineAdapter,
37 models::{
38 fee::{FeeModelAny, MakerTakerFeeModel},
39 fill::FillModel,
40 },
41};
42use nautilus_model::{
43 accounts::AccountAny,
44 data::{Bar, OrderBookDeltas, QuoteTick, TradeTick},
45 enums::OmsType,
46 identifiers::{AccountId, ClientId, InstrumentId, Venue},
47 instruments::{Instrument, InstrumentAny},
48 orders::Order,
49 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
50 types::{AccountBalance, MarginBalance, Money},
51};
52
53use crate::config::SandboxExecutionClientConfig;
54
55struct SandboxInner {
59 matching_engines: AHashMap<InstrumentId, OrderEngineAdapter>,
61 next_engine_raw_id: u32,
63 balances: AHashMap<String, Money>,
65 clock: Rc<RefCell<dyn Clock>>,
67 cache: Rc<RefCell<Cache>>,
69 config: SandboxExecutionClientConfig,
71}
72
73impl SandboxInner {
74 fn ensure_matching_engine(&mut self, instrument: &InstrumentAny) {
76 let instrument_id = instrument.id();
77
78 if !self.matching_engines.contains_key(&instrument_id) {
79 let engine_config = self.config.to_matching_engine_config();
80 let fill_model = FillModel::default();
81 let fee_model = FeeModelAny::MakerTaker(MakerTakerFeeModel);
82 let raw_id = self.next_engine_raw_id;
83 self.next_engine_raw_id = self.next_engine_raw_id.wrapping_add(1);
84
85 let engine = OrderEngineAdapter::new(
86 instrument.clone(),
87 raw_id,
88 fill_model,
89 fee_model,
90 self.config.book_type,
91 self.config.oms_type,
92 self.config.account_type,
93 self.clock.clone(),
94 self.cache.clone(),
95 engine_config,
96 );
97
98 self.matching_engines.insert(instrument_id, engine);
99 }
100 }
101
102 fn process_quote_tick(&mut self, quote: &QuoteTick) {
104 let instrument_id = quote.instrument_id;
105
106 let instrument = self.cache.borrow().instrument(&instrument_id).cloned();
108 if let Some(instrument) = instrument {
109 self.ensure_matching_engine(&instrument);
110 if let Some(engine) = self.matching_engines.get_mut(&instrument_id) {
111 engine.get_engine_mut().process_quote_tick(quote);
112 }
113 }
114 }
115
116 fn process_trade_tick(&mut self, trade: &TradeTick) {
118 if !self.config.trade_execution {
119 return;
120 }
121
122 let instrument_id = trade.instrument_id;
123
124 let instrument = self.cache.borrow().instrument(&instrument_id).cloned();
125 if let Some(instrument) = instrument {
126 self.ensure_matching_engine(&instrument);
127 if let Some(engine) = self.matching_engines.get_mut(&instrument_id) {
128 engine.get_engine_mut().process_trade_tick(trade);
129 }
130 }
131 }
132
133 fn process_bar(&mut self, bar: &Bar) {
135 if !self.config.bar_execution {
136 return;
137 }
138
139 let instrument_id = bar.bar_type.instrument_id();
140
141 let instrument = self.cache.borrow().instrument(&instrument_id).cloned();
142 if let Some(instrument) = instrument {
143 self.ensure_matching_engine(&instrument);
144 if let Some(engine) = self.matching_engines.get_mut(&instrument_id) {
145 engine.get_engine_mut().process_bar(bar);
146 }
147 }
148 }
149}
150
151struct RegisteredHandlers {
153 quote_pattern: MStr<Pattern>,
154 quote_handler: TypedHandler<QuoteTick>,
155 trade_pattern: MStr<Pattern>,
156 trade_handler: TypedHandler<TradeTick>,
157 bar_pattern: MStr<Pattern>,
158 bar_handler: TypedHandler<Bar>,
159}
160
161pub struct SandboxExecutionClient {
167 core: RefCell<ExecutionClientCore>,
169 config: SandboxExecutionClientConfig,
171 inner: Rc<RefCell<SandboxInner>>,
173 handlers: RefCell<Option<RegisteredHandlers>>,
175 started: RefCell<bool>,
177 connected: RefCell<bool>,
179 clock: Rc<RefCell<dyn Clock>>,
181 cache: Rc<RefCell<Cache>>,
183}
184
185impl Debug for SandboxExecutionClient {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 f.debug_struct(stringify!(SandboxExecutionClient))
188 .field("venue", &self.config.venue)
189 .field("account_id", &self.core.borrow().account_id)
190 .field("connected", &*self.connected.borrow())
191 .field(
192 "matching_engines",
193 &self.inner.borrow().matching_engines.len(),
194 )
195 .finish()
196 }
197}
198
199impl SandboxExecutionClient {
200 #[must_use]
202 pub fn new(
203 core: ExecutionClientCore,
204 config: SandboxExecutionClientConfig,
205 clock: Rc<RefCell<dyn Clock>>,
206 cache: Rc<RefCell<Cache>>,
207 ) -> Self {
208 let mut balances = AHashMap::new();
209 for money in &config.starting_balances {
210 balances.insert(money.currency.code.to_string(), *money);
211 }
212
213 let inner = Rc::new(RefCell::new(SandboxInner {
214 matching_engines: AHashMap::new(),
215 next_engine_raw_id: 0,
216 balances,
217 clock: clock.clone(),
218 cache: cache.clone(),
219 config: config.clone(),
220 }));
221
222 Self {
223 core: RefCell::new(core),
224 config,
225 inner,
226 handlers: RefCell::new(None),
227 started: RefCell::new(false),
228 connected: RefCell::new(false),
229 clock,
230 cache,
231 }
232 }
233
234 #[must_use]
236 pub const fn config(&self) -> &SandboxExecutionClientConfig {
237 &self.config
238 }
239
240 #[must_use]
242 pub fn matching_engine_count(&self) -> usize {
243 self.inner.borrow().matching_engines.len()
244 }
245
246 fn register_message_handlers(&self) {
251 if self.handlers.borrow().is_some() {
252 log::warn!("Sandbox message handlers already registered");
253 return;
254 }
255
256 let inner_weak = WeakCell::from(Rc::downgrade(&self.inner));
257 let venue = self.config.venue;
258
259 let quote_handler = {
261 let inner = inner_weak.clone();
262 TypedHandler::from(move |quote: &QuoteTick| {
263 if quote.instrument_id.venue == venue
264 && let Some(inner_rc) = inner.upgrade()
265 {
266 inner_rc.borrow_mut().process_quote_tick(quote);
267 }
268 })
269 };
270
271 let trade_handler = {
273 let inner = inner_weak.clone();
274 TypedHandler::from(move |trade: &TradeTick| {
275 if trade.instrument_id.venue == venue
276 && let Some(inner_rc) = inner.upgrade()
277 {
278 inner_rc.borrow_mut().process_trade_tick(trade);
279 }
280 })
281 };
282
283 let bar_handler = {
285 let inner = inner_weak;
286 TypedHandler::from(move |bar: &Bar| {
287 if bar.bar_type.instrument_id().venue == venue
288 && let Some(inner_rc) = inner.upgrade()
289 {
290 inner_rc.borrow_mut().process_bar(bar);
291 }
292 })
293 };
294
295 let quote_pattern: MStr<Pattern> = format!("data.quotes.{venue}.*").into();
297 let trade_pattern: MStr<Pattern> = format!("data.trades.{venue}.*").into();
298 let bar_pattern: MStr<Pattern> = "data.bars.*".into();
299
300 msgbus::subscribe_quotes(quote_pattern, quote_handler.clone(), Some(10));
301 msgbus::subscribe_trades(trade_pattern, trade_handler.clone(), Some(10));
302 msgbus::subscribe_bars(bar_pattern, bar_handler.clone(), Some(10));
303
304 *self.handlers.borrow_mut() = Some(RegisteredHandlers {
306 quote_pattern,
307 quote_handler,
308 trade_pattern,
309 trade_handler,
310 bar_pattern,
311 bar_handler,
312 });
313
314 log::info!(
315 "Sandbox registered message handlers for venue={}",
316 self.config.venue
317 );
318 }
319
320 fn deregister_message_handlers(&self) {
322 if let Some(handlers) = self.handlers.borrow_mut().take() {
323 msgbus::unsubscribe_quotes(handlers.quote_pattern, &handlers.quote_handler);
324 msgbus::unsubscribe_trades(handlers.trade_pattern, &handlers.trade_handler);
325 msgbus::unsubscribe_bars(handlers.bar_pattern, &handlers.bar_handler);
326
327 log::info!(
328 "Sandbox deregistered message handlers for venue={}",
329 self.config.venue
330 );
331 }
332 }
333
334 fn get_current_account_balances(&self) -> Vec<AccountBalance> {
336 let account_id = self.core.borrow().account_id;
337 let cache = self.cache.borrow();
338
339 if let Some(account) = cache.account(&account_id) {
341 return account.balances().into_values().collect();
342 }
343
344 self.get_account_balances()
346 }
347
348 pub fn process_quote_tick(&self, quote: &QuoteTick) -> anyhow::Result<()> {
354 let instrument_id = quote.instrument_id;
355 let instrument = self
356 .cache
357 .borrow()
358 .instrument(&instrument_id)
359 .cloned()
360 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
361
362 let mut inner = self.inner.borrow_mut();
363 inner.ensure_matching_engine(&instrument);
364 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
365 engine.get_engine_mut().process_quote_tick(quote);
366 }
367 Ok(())
368 }
369
370 pub fn process_trade_tick(&self, trade: &TradeTick) -> anyhow::Result<()> {
376 if !self.config.trade_execution {
377 return Ok(());
378 }
379
380 let instrument_id = trade.instrument_id;
381 let instrument = self
382 .cache
383 .borrow()
384 .instrument(&instrument_id)
385 .cloned()
386 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
387
388 let mut inner = self.inner.borrow_mut();
389 inner.ensure_matching_engine(&instrument);
390 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
391 engine.get_engine_mut().process_trade_tick(trade);
392 }
393 Ok(())
394 }
395
396 pub fn process_bar(&self, bar: &Bar) -> anyhow::Result<()> {
402 if !self.config.bar_execution {
403 return Ok(());
404 }
405
406 let instrument_id = bar.bar_type.instrument_id();
407 let instrument = self
408 .cache
409 .borrow()
410 .instrument(&instrument_id)
411 .cloned()
412 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
413
414 let mut inner = self.inner.borrow_mut();
415 inner.ensure_matching_engine(&instrument);
416 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
417 engine.get_engine_mut().process_bar(bar);
418 }
419 Ok(())
420 }
421
422 pub fn process_order_book_deltas(&self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
428 let instrument_id = deltas.instrument_id;
429 let instrument = self
430 .cache
431 .borrow()
432 .instrument(&instrument_id)
433 .cloned()
434 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
435
436 let mut inner = self.inner.borrow_mut();
437 inner.ensure_matching_engine(&instrument);
438 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
439 engine.get_engine_mut().process_order_book_deltas(deltas)?;
440 }
441 Ok(())
442 }
443
444 pub fn reset(&self) {
446 let mut inner = self.inner.borrow_mut();
447 for engine in inner.matching_engines.values_mut() {
448 engine.get_engine_mut().reset();
449 }
450
451 inner.balances.clear();
452 for money in &self.config.starting_balances {
453 inner
454 .balances
455 .insert(money.currency.code.to_string(), *money);
456 }
457
458 log::info!(
459 "Sandbox execution client reset: venue={}",
460 self.config.venue
461 );
462 }
463
464 fn get_account_balances(&self) -> Vec<AccountBalance> {
466 self.inner
467 .borrow()
468 .balances
469 .values()
470 .map(|money| AccountBalance::new(*money, Money::new(0.0, money.currency), *money))
471 .collect()
472 }
473}
474
475#[async_trait(?Send)]
476impl ExecutionClient for SandboxExecutionClient {
477 fn is_connected(&self) -> bool {
478 *self.connected.borrow()
479 }
480
481 fn client_id(&self) -> ClientId {
482 self.core.borrow().client_id
483 }
484
485 fn account_id(&self) -> AccountId {
486 self.core.borrow().account_id
487 }
488
489 fn venue(&self) -> Venue {
490 self.core.borrow().venue
491 }
492
493 fn oms_type(&self) -> OmsType {
494 self.config.oms_type
495 }
496
497 fn get_account(&self) -> Option<AccountAny> {
498 self.core.borrow().get_account()
499 }
500
501 fn generate_account_state(
502 &self,
503 balances: Vec<AccountBalance>,
504 margins: Vec<MarginBalance>,
505 reported: bool,
506 ts_event: UnixNanos,
507 ) -> anyhow::Result<()> {
508 self.core
509 .borrow()
510 .generate_account_state(balances, margins, reported, ts_event)
511 }
512
513 fn start(&mut self) -> anyhow::Result<()> {
514 if *self.started.borrow() {
515 return Ok(());
516 }
517
518 self.register_message_handlers();
520
521 *self.started.borrow_mut() = true;
522 let core = self.core.borrow();
523 log::info!(
524 "Sandbox execution client started: venue={}, account_id={}, oms_type={:?}, account_type={:?}",
525 self.config.venue,
526 core.account_id,
527 self.config.oms_type,
528 self.config.account_type,
529 );
530 Ok(())
531 }
532
533 fn stop(&mut self) -> anyhow::Result<()> {
534 if !*self.started.borrow() {
535 return Ok(());
536 }
537
538 self.deregister_message_handlers();
540
541 *self.started.borrow_mut() = false;
542 *self.connected.borrow_mut() = false;
543 log::info!(
544 "Sandbox execution client stopped: venue={}",
545 self.config.venue
546 );
547 Ok(())
548 }
549
550 async fn connect(&mut self) -> anyhow::Result<()> {
551 if *self.connected.borrow() {
552 return Ok(());
553 }
554
555 let balances = self.get_account_balances();
556 let ts_event = self.clock.borrow().timestamp_ns();
557 self.generate_account_state(balances, vec![], false, ts_event)?;
558
559 *self.connected.borrow_mut() = true;
560 self.core.borrow_mut().set_connected(true);
561 log::info!(
562 "Sandbox execution client connected: venue={}",
563 self.config.venue
564 );
565 Ok(())
566 }
567
568 async fn disconnect(&mut self) -> anyhow::Result<()> {
569 if !*self.connected.borrow() {
570 return Ok(());
571 }
572
573 *self.connected.borrow_mut() = false;
574 self.core.borrow_mut().set_connected(false);
575 log::info!(
576 "Sandbox execution client disconnected: venue={}",
577 self.config.venue
578 );
579 Ok(())
580 }
581
582 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
583 let core = self.core.borrow();
584 let mut order = core.get_order(&cmd.client_order_id)?;
585
586 if order.is_closed() {
587 log::warn!("Cannot submit closed order {}", order.client_order_id());
588 return Ok(());
589 }
590
591 core.generate_order_submitted(
592 order.strategy_id(),
593 order.instrument_id(),
594 order.client_order_id(),
595 cmd.ts_init,
596 );
597
598 let instrument_id = order.instrument_id();
599 let instrument = self
600 .cache
601 .borrow()
602 .instrument(&instrument_id)
603 .cloned()
604 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
605
606 drop(core); let mut inner = self.inner.borrow_mut();
609 inner.ensure_matching_engine(&instrument);
610
611 let cache = self.cache.borrow();
613 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
614 if let Some(quote) = cache.quote(&instrument_id) {
615 engine.get_engine_mut().process_quote_tick(quote);
616 }
617 if self.config.trade_execution
618 && let Some(trade) = cache.trade(&instrument_id)
619 {
620 engine.get_engine_mut().process_trade_tick(trade);
621 }
622 }
623 drop(cache);
624
625 let account_id = self.core.borrow().account_id;
626 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
627 engine
628 .get_engine_mut()
629 .process_order(&mut order, account_id);
630 }
631
632 Ok(())
633 }
634
635 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
636 let core = self.core.borrow();
637
638 for order in &cmd.order_list.orders {
639 if order.is_closed() {
640 log::warn!("Cannot submit closed order {}", order.client_order_id());
641 continue;
642 }
643
644 core.generate_order_submitted(
645 order.strategy_id(),
646 order.instrument_id(),
647 order.client_order_id(),
648 cmd.ts_init,
649 );
650 }
651
652 drop(core); let account_id = self.core.borrow().account_id;
655 for order in &cmd.order_list.orders {
656 if order.is_closed() {
657 continue;
658 }
659
660 let instrument_id = order.instrument_id();
661 let instrument = self.cache.borrow().instrument(&instrument_id).cloned();
662
663 if let Some(instrument) = instrument {
664 let mut inner = self.inner.borrow_mut();
665 inner.ensure_matching_engine(&instrument);
666
667 let cache = self.cache.borrow();
669 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
670 if let Some(quote) = cache.quote(&instrument_id) {
671 engine.get_engine_mut().process_quote_tick(quote);
672 }
673 if self.config.trade_execution
674 && let Some(trade) = cache.trade(&instrument_id)
675 {
676 engine.get_engine_mut().process_trade_tick(trade);
677 }
678 }
679 drop(cache);
680
681 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
682 let mut order_clone = order.clone();
683 engine
684 .get_engine_mut()
685 .process_order(&mut order_clone, account_id);
686 }
687 }
688 }
689
690 Ok(())
691 }
692
693 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
694 let instrument_id = cmd.instrument_id;
695 let account_id = self.core.borrow().account_id;
696
697 let mut inner = self.inner.borrow_mut();
698 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
699 engine.get_engine_mut().process_modify(cmd, account_id);
700 }
701 Ok(())
702 }
703
704 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
705 let instrument_id = cmd.instrument_id;
706 let account_id = self.core.borrow().account_id;
707
708 let mut inner = self.inner.borrow_mut();
709 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
710 engine.get_engine_mut().process_cancel(cmd, account_id);
711 }
712 Ok(())
713 }
714
715 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
716 let instrument_id = cmd.instrument_id;
717 let account_id = self.core.borrow().account_id;
718
719 let mut inner = self.inner.borrow_mut();
720 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
721 engine.get_engine_mut().process_cancel_all(cmd, account_id);
722 }
723 Ok(())
724 }
725
726 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
727 let instrument_id = cmd.instrument_id;
728 let account_id = self.core.borrow().account_id;
729
730 let mut inner = self.inner.borrow_mut();
731 if let Some(engine) = inner.matching_engines.get_mut(&instrument_id) {
732 engine
733 .get_engine_mut()
734 .process_batch_cancel(cmd, account_id);
735 }
736 Ok(())
737 }
738
739 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
740 let balances = self.get_current_account_balances();
741 let ts_event = self.clock.borrow().timestamp_ns();
742 self.generate_account_state(balances, vec![], false, ts_event)?;
743 Ok(())
744 }
745
746 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
747 Ok(())
749 }
750
751 async fn generate_order_status_report(
752 &self,
753 _cmd: &GenerateOrderStatusReport,
754 ) -> anyhow::Result<Option<OrderStatusReport>> {
755 Ok(None)
757 }
758
759 async fn generate_order_status_reports(
760 &self,
761 _cmd: &GenerateOrderStatusReports,
762 ) -> anyhow::Result<Vec<OrderStatusReport>> {
763 Ok(Vec::new())
765 }
766
767 async fn generate_fill_reports(
768 &self,
769 _cmd: GenerateFillReports,
770 ) -> anyhow::Result<Vec<FillReport>> {
771 Ok(Vec::new())
773 }
774
775 async fn generate_position_status_reports(
776 &self,
777 _cmd: &GeneratePositionStatusReports,
778 ) -> anyhow::Result<Vec<PositionStatusReport>> {
779 Ok(Vec::new())
781 }
782
783 async fn generate_mass_status(
784 &self,
785 _lookback_mins: Option<u64>,
786 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
787 Ok(None)
789 }
790}