1use std::{
19 num::NonZeroUsize,
20 ops::{Deref, DerefMut},
21 time::Duration,
22};
23
24use ahash::AHashMap;
25use nautilus_common::{
26 actor::{DataActor, DataActorConfig, DataActorCore},
27 enums::LogColor,
28 log_info,
29 timer::TimeEvent,
30};
31use nautilus_model::{
32 data::{
33 Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
34 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, bar::BarType,
35 },
36 enums::BookType,
37 identifiers::{ClientId, InstrumentId},
38 instruments::InstrumentAny,
39 orderbook::OrderBook,
40};
41
42#[derive(Debug, Clone)]
44pub struct DataTesterConfig {
45 pub base: DataActorConfig,
47 pub instrument_ids: Vec<InstrumentId>,
49 pub client_id: Option<ClientId>,
51 pub bar_types: Option<Vec<BarType>>,
53 pub subscribe_book_deltas: bool,
55 pub subscribe_book_depth: bool,
57 pub subscribe_book_at_interval: bool,
59 pub subscribe_quotes: bool,
61 pub subscribe_trades: bool,
63 pub subscribe_mark_prices: bool,
65 pub subscribe_index_prices: bool,
67 pub subscribe_funding_rates: bool,
69 pub subscribe_bars: bool,
71 pub subscribe_instrument: bool,
73 pub subscribe_instrument_status: bool,
75 pub subscribe_instrument_close: bool,
77 pub can_unsubscribe: bool,
80 pub request_instruments: bool,
82 pub request_quotes: bool,
85 pub request_trades: bool,
88 pub request_bars: bool,
91 pub book_type: BookType,
94 pub book_depth: Option<NonZeroUsize>,
96 pub book_interval_ms: NonZeroUsize,
99 pub book_levels_to_print: usize,
101 pub manage_book: bool,
103 pub log_data: bool,
105 pub stats_interval_secs: u64,
107}
108
109impl DataTesterConfig {
110 #[must_use]
118 pub fn new(
119 client_id: ClientId,
120 instrument_ids: Vec<InstrumentId>,
121 subscribe_quotes: bool,
122 subscribe_trades: bool,
123 ) -> Self {
124 Self {
125 base: DataActorConfig::default(),
126 instrument_ids,
127 client_id: Some(client_id),
128 bar_types: None,
129 subscribe_book_deltas: false,
130 subscribe_book_depth: false,
131 subscribe_book_at_interval: false,
132 subscribe_quotes,
133 subscribe_trades,
134 subscribe_mark_prices: false,
135 subscribe_index_prices: false,
136 subscribe_funding_rates: false,
137 subscribe_bars: false,
138 subscribe_instrument: false,
139 subscribe_instrument_status: false,
140 subscribe_instrument_close: false,
141 can_unsubscribe: true,
142 request_instruments: false,
143 request_quotes: false,
144 request_trades: false,
145 request_bars: false,
146 book_type: BookType::L2_MBP,
147 book_depth: None,
148 book_interval_ms: NonZeroUsize::new(1000).unwrap(),
149 book_levels_to_print: 10,
150 manage_book: false,
151 log_data: true,
152 stats_interval_secs: 5,
153 }
154 }
155}
156
157impl Default for DataTesterConfig {
158 fn default() -> Self {
159 Self {
160 base: DataActorConfig::default(),
161 instrument_ids: Vec::new(),
162 client_id: None,
163 bar_types: None,
164 subscribe_book_deltas: false,
165 subscribe_book_depth: false,
166 subscribe_book_at_interval: false,
167 subscribe_quotes: false,
168 subscribe_trades: false,
169 subscribe_mark_prices: false,
170 subscribe_index_prices: false,
171 subscribe_funding_rates: false,
172 subscribe_bars: false,
173 subscribe_instrument: false,
174 subscribe_instrument_status: false,
175 subscribe_instrument_close: false,
176 can_unsubscribe: true,
177 request_instruments: false,
178 request_quotes: false,
179 request_trades: false,
180 request_bars: false,
181 book_type: BookType::L2_MBP,
182 book_depth: None,
183 book_interval_ms: NonZeroUsize::new(1000).unwrap(),
184 book_levels_to_print: 10,
185 manage_book: false,
186 log_data: true,
187 stats_interval_secs: 5,
188 }
189 }
190}
191
192#[derive(Debug)]
201pub struct DataTester {
202 core: DataActorCore,
203 config: DataTesterConfig,
204 books: AHashMap<InstrumentId, OrderBook>,
205}
206
207impl Deref for DataTester {
208 type Target = DataActorCore;
209
210 fn deref(&self) -> &Self::Target {
211 &self.core
212 }
213}
214
215impl DerefMut for DataTester {
216 fn deref_mut(&mut self) -> &mut Self::Target {
217 &mut self.core
218 }
219}
220
221impl DataActor for DataTester {
222 fn on_start(&mut self) -> anyhow::Result<()> {
223 let instrument_ids = self.config.instrument_ids.clone();
224 let client_id = self.config.client_id;
225 let stats_interval_secs = self.config.stats_interval_secs;
226
227 if self.config.request_instruments {
229 let mut venues = std::collections::HashSet::new();
230 for instrument_id in &instrument_ids {
231 venues.insert(instrument_id.venue);
232 }
233
234 for venue in venues {
235 let _ = self.request_instruments(Some(venue), None, None, client_id, None);
236 }
237 }
238
239 for instrument_id in instrument_ids {
241 if self.config.subscribe_instrument {
242 self.subscribe_instrument(instrument_id, client_id, None);
243 }
244
245 if self.config.subscribe_book_deltas {
246 self.subscribe_book_deltas(
247 instrument_id,
248 self.config.book_type,
249 None,
250 client_id,
251 self.config.manage_book,
252 None,
253 );
254
255 if self.config.manage_book {
256 let book = OrderBook::new(instrument_id, self.config.book_type);
257 self.books.insert(instrument_id, book);
258 }
259 }
260
261 if self.config.subscribe_book_at_interval {
262 self.subscribe_book_at_interval(
263 instrument_id,
264 self.config.book_type,
265 self.config.book_depth,
266 self.config.book_interval_ms,
267 client_id,
268 None,
269 );
270 }
271
272 if self.config.subscribe_quotes {
284 self.subscribe_quotes(instrument_id, client_id, None);
285 }
286
287 if self.config.subscribe_trades {
288 self.subscribe_trades(instrument_id, client_id, None);
289 }
290
291 if self.config.subscribe_mark_prices {
292 self.subscribe_mark_prices(instrument_id, client_id, None);
293 }
294
295 if self.config.subscribe_index_prices {
296 self.subscribe_index_prices(instrument_id, client_id, None);
297 }
298
299 if self.config.subscribe_funding_rates {
300 self.subscribe_funding_rates(instrument_id, client_id, None);
301 }
302
303 if self.config.subscribe_instrument_status {
304 self.subscribe_instrument_status(instrument_id, client_id, None);
305 }
306
307 if self.config.subscribe_instrument_close {
308 self.subscribe_instrument_close(instrument_id, client_id, None);
309 }
310
311 }
321
322 if let Some(bar_types) = self.config.bar_types.clone() {
324 for bar_type in bar_types {
325 if self.config.subscribe_bars {
326 self.subscribe_bars(bar_type, client_id, None);
327 }
328
329 }
334 }
335
336 if stats_interval_secs > 0 {
338 self.clock().set_timer(
339 "STATS-TIMER",
340 Duration::from_secs(stats_interval_secs),
341 None,
342 None,
343 None,
344 Some(true),
345 Some(false),
346 )?;
347 }
348
349 Ok(())
350 }
351
352 fn on_stop(&mut self) -> anyhow::Result<()> {
353 if !self.config.can_unsubscribe {
354 return Ok(());
355 }
356
357 let instrument_ids = self.config.instrument_ids.clone();
358 let client_id = self.config.client_id;
359
360 for instrument_id in instrument_ids {
361 if self.config.subscribe_instrument {
362 self.unsubscribe_instrument(instrument_id, client_id, None);
363 }
364
365 if self.config.subscribe_book_deltas {
366 self.unsubscribe_book_deltas(instrument_id, client_id, None);
367 }
368
369 if self.config.subscribe_book_at_interval {
370 self.unsubscribe_book_at_interval(
371 instrument_id,
372 self.config.book_interval_ms,
373 client_id,
374 None,
375 );
376 }
377
378 if self.config.subscribe_quotes {
384 self.unsubscribe_quotes(instrument_id, client_id, None);
385 }
386
387 if self.config.subscribe_trades {
388 self.unsubscribe_trades(instrument_id, client_id, None);
389 }
390
391 if self.config.subscribe_mark_prices {
392 self.unsubscribe_mark_prices(instrument_id, client_id, None);
393 }
394
395 if self.config.subscribe_index_prices {
396 self.unsubscribe_index_prices(instrument_id, client_id, None);
397 }
398
399 if self.config.subscribe_funding_rates {
400 self.unsubscribe_funding_rates(instrument_id, client_id, None);
401 }
402
403 if self.config.subscribe_instrument_status {
404 self.unsubscribe_instrument_status(instrument_id, client_id, None);
405 }
406
407 if self.config.subscribe_instrument_close {
408 self.unsubscribe_instrument_close(instrument_id, client_id, None);
409 }
410 }
411
412 if let Some(bar_types) = self.config.bar_types.clone() {
413 for bar_type in bar_types {
414 if self.config.subscribe_bars {
415 self.unsubscribe_bars(bar_type, client_id, None);
416 }
417 }
418 }
419
420 Ok(())
421 }
422
423 fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
424 Ok(())
426 }
427
428 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
429 if self.config.log_data {
430 log_info!("Received {instrument:?}", color = LogColor::Cyan);
431 }
432 Ok(())
433 }
434
435 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
436 if self.config.manage_book {
437 if let Some(book) = self.books.get_mut(&deltas.instrument_id) {
438 book.apply_deltas(deltas)?;
439
440 if self.config.log_data {
441 let levels = self.config.book_levels_to_print;
442 let instrument_id = deltas.instrument_id;
443 let book_str = book.pprint(levels, None);
444 log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
445 }
446 }
447 } else if self.config.log_data {
448 log_info!("Received {deltas:?}", color = LogColor::Cyan);
449 }
450 Ok(())
451 }
452
453 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
454 if self.config.log_data {
455 log_info!("Received {quote:?}", color = LogColor::Cyan);
456 }
457 Ok(())
458 }
459
460 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
461 if self.config.log_data {
462 log_info!("Received {trade:?}", color = LogColor::Cyan);
463 }
464 Ok(())
465 }
466
467 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
468 if self.config.log_data {
469 log_info!("Received {bar:?}", color = LogColor::Cyan);
470 }
471 Ok(())
472 }
473
474 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
475 if self.config.log_data {
476 log_info!("Received {mark_price:?}", color = LogColor::Cyan);
477 }
478 Ok(())
479 }
480
481 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
482 if self.config.log_data {
483 log_info!("Received {index_price:?}", color = LogColor::Cyan);
484 }
485 Ok(())
486 }
487
488 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
489 if self.config.log_data {
490 log_info!("Received {funding_rate:?}", color = LogColor::Cyan);
491 }
492 Ok(())
493 }
494
495 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
496 if self.config.log_data {
497 log_info!("Received {data:?}", color = LogColor::Cyan);
498 }
499 Ok(())
500 }
501
502 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
503 if self.config.log_data {
504 log_info!("Received {update:?}", color = LogColor::Cyan);
505 }
506 Ok(())
507 }
508}
509
510impl DataTester {
511 #[must_use]
513 pub fn new(config: DataTesterConfig) -> Self {
514 Self {
515 core: DataActorCore::new(config.base.clone()),
516 config,
517 books: AHashMap::new(),
518 }
519 }
520}
521
522#[cfg(test)]
527mod tests {
528 use nautilus_core::UnixNanos;
529 use nautilus_model::{
530 data::OrderBookDelta,
531 enums::{InstrumentCloseType, MarketStatusAction},
532 identifiers::Symbol,
533 instruments::CurrencyPair,
534 types::{Currency, Price, Quantity},
535 };
536 use rstest::*;
537 use rust_decimal::Decimal;
538
539 use super::*;
540
541 #[fixture]
542 fn config() -> DataTesterConfig {
543 let client_id = ClientId::new("TEST");
544 let instrument_ids = vec![
545 InstrumentId::from("BTC-USDT.TEST"),
546 InstrumentId::from("ETH-USDT.TEST"),
547 ];
548 DataTesterConfig::new(client_id, instrument_ids, true, true)
549 }
550
551 #[rstest]
552 fn test_config_creation() {
553 let client_id = ClientId::new("TEST");
554 let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
555 let config = DataTesterConfig::new(client_id, instrument_ids.clone(), true, false);
556
557 assert_eq!(config.client_id, Some(client_id));
558 assert_eq!(config.instrument_ids, instrument_ids);
559 assert!(config.subscribe_quotes);
560 assert!(!config.subscribe_trades);
561 assert!(config.log_data);
562 assert_eq!(config.stats_interval_secs, 5);
563 }
564
565 #[rstest]
566 fn test_config_default() {
567 let config = DataTesterConfig::default();
568
569 assert_eq!(config.client_id, None);
570 assert!(config.instrument_ids.is_empty());
571 assert!(!config.subscribe_quotes);
572 assert!(!config.subscribe_trades);
573 assert!(!config.subscribe_bars);
574 assert!(config.can_unsubscribe);
575 assert!(config.log_data);
576 }
577
578 #[rstest]
579 fn test_actor_creation(config: DataTesterConfig) {
580 let actor = DataTester::new(config);
581
582 assert_eq!(actor.config.client_id, Some(ClientId::new("TEST")));
583 assert_eq!(actor.config.instrument_ids.len(), 2);
584 }
585
586 #[rstest]
587 fn test_on_quote_with_logging_enabled(config: DataTesterConfig) {
588 let mut actor = DataTester::new(config);
589
590 let quote = QuoteTick::default();
591 let result = actor.on_quote("e);
592
593 assert!(result.is_ok());
594 }
595
596 #[rstest]
597 fn test_on_quote_with_logging_disabled(mut config: DataTesterConfig) {
598 config.log_data = false;
599 let mut actor = DataTester::new(config);
600
601 let quote = QuoteTick::default();
602 let result = actor.on_quote("e);
603
604 assert!(result.is_ok());
605 }
606
607 #[rstest]
608 fn test_on_trade(config: DataTesterConfig) {
609 let mut actor = DataTester::new(config);
610
611 let trade = TradeTick::default();
612 let result = actor.on_trade(&trade);
613
614 assert!(result.is_ok());
615 }
616
617 #[rstest]
618 fn test_on_bar(config: DataTesterConfig) {
619 let mut actor = DataTester::new(config);
620
621 let bar = Bar::default();
622 let result = actor.on_bar(&bar);
623
624 assert!(result.is_ok());
625 }
626
627 #[rstest]
628 fn test_on_instrument(config: DataTesterConfig) {
629 let mut actor = DataTester::new(config);
630
631 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
632 let instrument = CurrencyPair::new(
633 instrument_id,
634 Symbol::from("BTC/USDT"),
635 Currency::USD(),
636 Currency::USD(),
637 4,
638 3,
639 Price::from("0.0001"),
640 Quantity::from("0.001"),
641 None,
642 None,
643 None,
644 None,
645 None,
646 None,
647 None,
648 None,
649 None,
650 None,
651 None,
652 None,
653 UnixNanos::default(),
654 UnixNanos::default(),
655 );
656 let result = actor.on_instrument(&InstrumentAny::CurrencyPair(instrument));
657
658 assert!(result.is_ok());
659 }
660
661 #[rstest]
662 fn test_on_book_deltas_without_managed_book(config: DataTesterConfig) {
663 let mut actor = DataTester::new(config);
664
665 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
666 let delta =
667 OrderBookDelta::clear(instrument_id, 0, UnixNanos::default(), UnixNanos::default());
668 let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
669 let result = actor.on_book_deltas(&deltas);
670
671 assert!(result.is_ok());
672 }
673
674 #[rstest]
675 fn test_on_mark_price(config: DataTesterConfig) {
676 let mut actor = DataTester::new(config);
677
678 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
679 let price = Price::from("50000.0");
680 let mark_price = MarkPriceUpdate::new(
681 instrument_id,
682 price,
683 UnixNanos::default(),
684 UnixNanos::default(),
685 );
686 let result = actor.on_mark_price(&mark_price);
687
688 assert!(result.is_ok());
689 }
690
691 #[rstest]
692 fn test_on_index_price(config: DataTesterConfig) {
693 let mut actor = DataTester::new(config);
694
695 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
696 let price = Price::from("50000.0");
697 let index_price = IndexPriceUpdate::new(
698 instrument_id,
699 price,
700 UnixNanos::default(),
701 UnixNanos::default(),
702 );
703 let result = actor.on_index_price(&index_price);
704
705 assert!(result.is_ok());
706 }
707
708 #[rstest]
709 fn test_on_funding_rate(config: DataTesterConfig) {
710 let mut actor = DataTester::new(config);
711
712 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
713 let funding_rate = FundingRateUpdate::new(
714 instrument_id,
715 Decimal::new(1, 4),
716 None,
717 UnixNanos::default(),
718 UnixNanos::default(),
719 );
720 let result = actor.on_funding_rate(&funding_rate);
721
722 assert!(result.is_ok());
723 }
724
725 #[rstest]
726 fn test_on_instrument_status(config: DataTesterConfig) {
727 let mut actor = DataTester::new(config);
728
729 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
730 let status = InstrumentStatus::new(
731 instrument_id,
732 MarketStatusAction::Trading,
733 UnixNanos::default(),
734 UnixNanos::default(),
735 None,
736 None,
737 None,
738 None,
739 None,
740 );
741 let result = actor.on_instrument_status(&status);
742
743 assert!(result.is_ok());
744 }
745
746 #[rstest]
747 fn test_on_instrument_close(config: DataTesterConfig) {
748 let mut actor = DataTester::new(config);
749
750 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
751 let price = Price::from("50000.0");
752 let close = InstrumentClose::new(
753 instrument_id,
754 price,
755 InstrumentCloseType::EndOfSession,
756 UnixNanos::default(),
757 UnixNanos::default(),
758 );
759 let result = actor.on_instrument_close(&close);
760
761 assert!(result.is_ok());
762 }
763
764 #[rstest]
765 fn test_on_time_event(config: DataTesterConfig) {
766 let mut actor = DataTester::new(config);
767
768 let event = TimeEvent::new(
769 "TEST".into(),
770 Default::default(),
771 UnixNanos::default(),
772 UnixNanos::default(),
773 );
774 let result = actor.on_time_event(&event);
775
776 assert!(result.is_ok());
777 }
778
779 #[rstest]
780 fn test_config_with_all_subscriptions_enabled(mut config: DataTesterConfig) {
781 config.subscribe_book_deltas = true;
782 config.subscribe_book_at_interval = true;
783 config.subscribe_bars = true;
784 config.subscribe_mark_prices = true;
785 config.subscribe_index_prices = true;
786 config.subscribe_funding_rates = true;
787 config.subscribe_instrument = true;
788 config.subscribe_instrument_status = true;
789 config.subscribe_instrument_close = true;
790
791 let actor = DataTester::new(config);
792
793 assert!(actor.config.subscribe_book_deltas);
794 assert!(actor.config.subscribe_book_at_interval);
795 assert!(actor.config.subscribe_bars);
796 assert!(actor.config.subscribe_mark_prices);
797 assert!(actor.config.subscribe_index_prices);
798 assert!(actor.config.subscribe_funding_rates);
799 assert!(actor.config.subscribe_instrument);
800 assert!(actor.config.subscribe_instrument_status);
801 assert!(actor.config.subscribe_instrument_close);
802 }
803
804 #[rstest]
805 fn test_config_with_book_management(mut config: DataTesterConfig) {
806 config.manage_book = true;
807 config.book_levels_to_print = 5;
808
809 let actor = DataTester::new(config);
810
811 assert!(actor.config.manage_book);
812 assert_eq!(actor.config.book_levels_to_print, 5);
813 assert!(actor.books.is_empty());
814 }
815
816 #[rstest]
817 fn test_config_with_custom_stats_interval(mut config: DataTesterConfig) {
818 config.stats_interval_secs = 10;
819
820 let actor = DataTester::new(config);
821
822 assert_eq!(actor.config.stats_interval_secs, 10);
823 }
824
825 #[rstest]
826 fn test_config_with_unsubscribe_disabled(mut config: DataTesterConfig) {
827 config.can_unsubscribe = false;
828
829 let actor = DataTester::new(config);
830
831 assert!(!actor.config.can_unsubscribe);
832 }
833}