Skip to main content

nautilus_testkit/testers/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data tester actor for live testing market data subscriptions.
17
18use std::{
19    num::NonZeroUsize,
20    ops::{Deref, DerefMut},
21    time::Duration,
22};
23
24use ahash::{AHashMap, AHashSet};
25use chrono::Duration as ChronoDuration;
26use nautilus_common::{
27    actor::{DataActor, DataActorConfig, DataActorCore},
28    enums::LogColor,
29    log_info,
30    timer::TimeEvent,
31};
32use nautilus_model::{
33    data::{
34        Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
35        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, bar::BarType,
36    },
37    enums::BookType,
38    identifiers::{ClientId, InstrumentId},
39    instruments::InstrumentAny,
40    orderbook::OrderBook,
41};
42
43/// Configuration for the data tester actor.
44#[derive(Debug, Clone)]
45pub struct DataTesterConfig {
46    /// Base data actor configuration.
47    pub base: DataActorConfig,
48    /// Instrument IDs to subscribe to.
49    pub instrument_ids: Vec<InstrumentId>,
50    /// Client ID to use for subscriptions.
51    pub client_id: Option<ClientId>,
52    /// Bar types to subscribe to.
53    pub bar_types: Option<Vec<BarType>>,
54    /// Whether to subscribe to order book deltas.
55    pub subscribe_book_deltas: bool,
56    /// Whether to subscribe to order book depth snapshots.
57    pub subscribe_book_depth: bool,
58    /// Whether to subscribe to order book at interval.
59    pub subscribe_book_at_interval: bool,
60    /// Whether to subscribe to quotes.
61    pub subscribe_quotes: bool,
62    /// Whether to subscribe to trades.
63    pub subscribe_trades: bool,
64    /// Whether to subscribe to mark prices.
65    pub subscribe_mark_prices: bool,
66    /// Whether to subscribe to index prices.
67    pub subscribe_index_prices: bool,
68    /// Whether to subscribe to funding rates.
69    pub subscribe_funding_rates: bool,
70    /// Whether to subscribe to bars.
71    pub subscribe_bars: bool,
72    /// Whether to subscribe to instrument updates.
73    pub subscribe_instrument: bool,
74    /// Whether to subscribe to instrument status.
75    pub subscribe_instrument_status: bool,
76    /// Whether to subscribe to instrument close.
77    pub subscribe_instrument_close: bool,
78    // TODO: Support subscribe_params when we have a type-safe way to pass arbitrary params
79    /// Whether unsubscribe is supported on stop.
80    pub can_unsubscribe: bool,
81    /// Whether to request instruments on start.
82    pub request_instruments: bool,
83    // TODO: Support request_quotes when historical data requests are available
84    /// Whether to request historical quotes (not yet implemented).
85    pub request_quotes: bool,
86    // TODO: Support request_trades when historical data requests are available
87    /// Whether to request historical trades (not yet implemented).
88    pub request_trades: bool,
89    /// Whether to request historical bars.
90    pub request_bars: bool,
91    /// Whether to request order book snapshots.
92    pub request_book_snapshot: bool,
93    // TODO: Support request_book_deltas when Rust data engine has RequestBookDeltas
94    /// Whether to request historical order book deltas (not yet implemented).
95    pub request_book_deltas: bool,
96    /// Whether to request historical funding rates.
97    pub request_funding_rates: bool,
98    // TODO: Support requests_start_delta when we implement historical data requests
99    /// Book type for order book subscriptions.
100    pub book_type: BookType,
101    /// Order book depth for subscriptions.
102    pub book_depth: Option<NonZeroUsize>,
103    // TODO: Support book_group_size when order book grouping is implemented
104    /// Order book interval in milliseconds for at_interval subscriptions.
105    pub book_interval_ms: NonZeroUsize,
106    /// Number of order book levels to print when logging.
107    pub book_levels_to_print: usize,
108    /// Whether to manage local order book from deltas.
109    pub manage_book: bool,
110    /// Whether to log received data.
111    pub log_data: bool,
112    /// Stats logging interval in seconds (0 to disable).
113    pub stats_interval_secs: u64,
114}
115
116impl DataTesterConfig {
117    /// Creates a new [`DataTesterConfig`] instance with minimal settings.
118    ///
119    /// # Panics
120    ///
121    /// Panics if `NonZeroUsize::new(1000)` fails (which should never happen).
122    #[must_use]
123    pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
124        Self {
125            base: DataActorConfig::default(),
126            instrument_ids,
127            client_id: Some(client_id),
128            bar_types: None,
129            subscribe_book_deltas: false,
130            subscribe_book_depth: false,
131            subscribe_book_at_interval: false,
132            subscribe_quotes: false,
133            subscribe_trades: false,
134            subscribe_mark_prices: false,
135            subscribe_index_prices: false,
136            subscribe_funding_rates: false,
137            subscribe_bars: false,
138
139            subscribe_instrument: false,
140            subscribe_instrument_status: false,
141            subscribe_instrument_close: false,
142            can_unsubscribe: true,
143            request_instruments: false,
144            request_quotes: false,
145            request_trades: false,
146            request_bars: false,
147            request_book_snapshot: false,
148            request_book_deltas: false,
149            request_funding_rates: false,
150            book_type: BookType::L2_MBP,
151            book_depth: None,
152            book_interval_ms: NonZeroUsize::new(1000).unwrap(),
153            book_levels_to_print: 10,
154            manage_book: true,
155            log_data: true,
156            stats_interval_secs: 5,
157        }
158    }
159
160    #[must_use]
161    pub fn with_log_data(mut self, log_data: bool) -> Self {
162        self.log_data = log_data;
163        self
164    }
165
166    #[must_use]
167    pub fn with_subscribe_book_deltas(mut self, subscribe: bool) -> Self {
168        self.subscribe_book_deltas = subscribe;
169        self
170    }
171
172    #[must_use]
173    pub fn with_subscribe_book_depth(mut self, subscribe: bool) -> Self {
174        self.subscribe_book_depth = subscribe;
175        self
176    }
177
178    #[must_use]
179    pub fn with_subscribe_book_at_interval(mut self, subscribe: bool) -> Self {
180        self.subscribe_book_at_interval = subscribe;
181        self
182    }
183
184    #[must_use]
185    pub fn with_subscribe_quotes(mut self, subscribe: bool) -> Self {
186        self.subscribe_quotes = subscribe;
187        self
188    }
189
190    #[must_use]
191    pub fn with_subscribe_trades(mut self, subscribe: bool) -> Self {
192        self.subscribe_trades = subscribe;
193        self
194    }
195
196    #[must_use]
197    pub fn with_subscribe_mark_prices(mut self, subscribe: bool) -> Self {
198        self.subscribe_mark_prices = subscribe;
199        self
200    }
201
202    #[must_use]
203    pub fn with_subscribe_index_prices(mut self, subscribe: bool) -> Self {
204        self.subscribe_index_prices = subscribe;
205        self
206    }
207
208    #[must_use]
209    pub fn with_subscribe_funding_rates(mut self, subscribe: bool) -> Self {
210        self.subscribe_funding_rates = subscribe;
211        self
212    }
213
214    #[must_use]
215    pub fn with_subscribe_bars(mut self, subscribe: bool) -> Self {
216        self.subscribe_bars = subscribe;
217        self
218    }
219
220    #[must_use]
221    pub fn with_bar_types(mut self, bar_types: Vec<BarType>) -> Self {
222        self.bar_types = Some(bar_types);
223        self
224    }
225
226    #[must_use]
227    pub fn with_subscribe_instrument(mut self, subscribe: bool) -> Self {
228        self.subscribe_instrument = subscribe;
229        self
230    }
231
232    #[must_use]
233    pub fn with_subscribe_instrument_status(mut self, subscribe: bool) -> Self {
234        self.subscribe_instrument_status = subscribe;
235        self
236    }
237
238    #[must_use]
239    pub fn with_subscribe_instrument_close(mut self, subscribe: bool) -> Self {
240        self.subscribe_instrument_close = subscribe;
241        self
242    }
243
244    #[must_use]
245    pub fn with_book_type(mut self, book_type: BookType) -> Self {
246        self.book_type = book_type;
247        self
248    }
249
250    #[must_use]
251    pub fn with_book_depth(mut self, depth: Option<NonZeroUsize>) -> Self {
252        self.book_depth = depth;
253        self
254    }
255
256    #[must_use]
257    pub fn with_book_interval_ms(mut self, interval_ms: NonZeroUsize) -> Self {
258        self.book_interval_ms = interval_ms;
259        self
260    }
261
262    #[must_use]
263    pub fn with_manage_book(mut self, manage: bool) -> Self {
264        self.manage_book = manage;
265        self
266    }
267
268    #[must_use]
269    pub fn with_request_instruments(mut self, request: bool) -> Self {
270        self.request_instruments = request;
271        self
272    }
273
274    #[must_use]
275    pub fn with_request_book_snapshot(mut self, request: bool) -> Self {
276        self.request_book_snapshot = request;
277        self
278    }
279
280    #[must_use]
281    pub fn with_request_book_deltas(mut self, request: bool) -> Self {
282        self.request_book_deltas = request;
283        self
284    }
285
286    #[must_use]
287    pub fn with_request_trades(mut self, request: bool) -> Self {
288        self.request_trades = request;
289        self
290    }
291
292    #[must_use]
293    pub fn with_request_bars(mut self, request: bool) -> Self {
294        self.request_bars = request;
295        self
296    }
297
298    #[must_use]
299    pub fn with_request_funding_rates(mut self, request: bool) -> Self {
300        self.request_funding_rates = request;
301        self
302    }
303
304    #[must_use]
305    pub fn with_can_unsubscribe(mut self, can_unsubscribe: bool) -> Self {
306        self.can_unsubscribe = can_unsubscribe;
307        self
308    }
309
310    #[must_use]
311    pub fn with_stats_interval_secs(mut self, interval_secs: u64) -> Self {
312        self.stats_interval_secs = interval_secs;
313        self
314    }
315}
316
317impl Default for DataTesterConfig {
318    fn default() -> Self {
319        Self {
320            base: DataActorConfig::default(),
321            instrument_ids: Vec::new(),
322            client_id: None,
323            bar_types: None,
324            subscribe_book_deltas: false,
325            subscribe_book_depth: false,
326            subscribe_book_at_interval: false,
327            subscribe_quotes: false,
328            subscribe_trades: false,
329            subscribe_mark_prices: false,
330            subscribe_index_prices: false,
331            subscribe_funding_rates: false,
332            subscribe_bars: false,
333            subscribe_instrument: false,
334            subscribe_instrument_status: false,
335            subscribe_instrument_close: false,
336            can_unsubscribe: true,
337            request_instruments: false,
338            request_quotes: false,
339            request_trades: false,
340            request_bars: false,
341            request_book_snapshot: false,
342            request_book_deltas: false,
343            request_funding_rates: false,
344            book_type: BookType::L2_MBP,
345            book_depth: None,
346            book_interval_ms: NonZeroUsize::new(1000).unwrap(),
347            book_levels_to_print: 10,
348            manage_book: false,
349            log_data: true,
350            stats_interval_secs: 5,
351        }
352    }
353}
354
355/// A data tester actor for live testing market data subscriptions.
356///
357/// Subscribes to configured data types for specified instruments and logs
358/// received data to demonstrate the data flow. Useful for testing adapters
359/// and validating data connectivity.
360///
361/// This actor provides equivalent functionality to the Python `DataTester`
362/// in the test kit.
363#[derive(Debug)]
364pub struct DataTester {
365    core: DataActorCore,
366    config: DataTesterConfig,
367    books: AHashMap<InstrumentId, OrderBook>,
368}
369
370impl Deref for DataTester {
371    type Target = DataActorCore;
372
373    fn deref(&self) -> &Self::Target {
374        &self.core
375    }
376}
377
378impl DerefMut for DataTester {
379    fn deref_mut(&mut self) -> &mut Self::Target {
380        &mut self.core
381    }
382}
383
384impl DataActor for DataTester {
385    fn on_start(&mut self) -> anyhow::Result<()> {
386        let instrument_ids = self.config.instrument_ids.clone();
387        let client_id = self.config.client_id;
388        let stats_interval_secs = self.config.stats_interval_secs;
389
390        // Request instruments if configured
391        if self.config.request_instruments {
392            let mut venues = AHashSet::new();
393            for instrument_id in &instrument_ids {
394                venues.insert(instrument_id.venue);
395            }
396
397            for venue in venues {
398                let _ = self.request_instruments(Some(venue), None, None, client_id, None);
399            }
400        }
401
402        // Subscribe to data for each instrument
403        for instrument_id in instrument_ids {
404            if self.config.subscribe_instrument {
405                self.subscribe_instrument(instrument_id, client_id, None);
406            }
407
408            if self.config.subscribe_book_deltas {
409                self.subscribe_book_deltas(
410                    instrument_id,
411                    self.config.book_type,
412                    None,
413                    client_id,
414                    self.config.manage_book,
415                    None,
416                );
417
418                if self.config.manage_book {
419                    let book = OrderBook::new(instrument_id, self.config.book_type);
420                    self.books.insert(instrument_id, book);
421                }
422            }
423
424            if self.config.subscribe_book_at_interval {
425                self.subscribe_book_at_interval(
426                    instrument_id,
427                    self.config.book_type,
428                    self.config.book_depth,
429                    self.config.book_interval_ms,
430                    client_id,
431                    None,
432                );
433            }
434
435            // TODO: Support subscribe_book_depth when the method is available
436            // if self.config.subscribe_book_depth {
437            //     self.subscribe_book_depth(
438            //         instrument_id,
439            //         self.config.book_type,
440            //         self.config.book_depth,
441            //         client_id,
442            //         None,
443            //     );
444            // }
445
446            if self.config.subscribe_quotes {
447                self.subscribe_quotes(instrument_id, client_id, None);
448            }
449
450            if self.config.subscribe_trades {
451                self.subscribe_trades(instrument_id, client_id, None);
452            }
453
454            if self.config.subscribe_mark_prices {
455                self.subscribe_mark_prices(instrument_id, client_id, None);
456            }
457
458            if self.config.subscribe_index_prices {
459                self.subscribe_index_prices(instrument_id, client_id, None);
460            }
461
462            if self.config.subscribe_funding_rates {
463                self.subscribe_funding_rates(instrument_id, client_id, None);
464            }
465
466            if self.config.subscribe_instrument_status {
467                self.subscribe_instrument_status(instrument_id, client_id, None);
468            }
469
470            if self.config.subscribe_instrument_close {
471                self.subscribe_instrument_close(instrument_id, client_id, None);
472            }
473
474            // TODO: Implement historical data requests
475            // if self.config.request_quotes {
476            //     self.request_quote_ticks(...);
477            // }
478
479            // Request order book snapshot if configured
480            if self.config.request_book_snapshot {
481                let _ = self.request_book_snapshot(
482                    instrument_id,
483                    self.config.book_depth,
484                    client_id,
485                    None,
486                );
487            }
488
489            // TODO: Request book deltas when Rust data engine has RequestBookDeltas
490
491            // Request historical trades (default to last 1 hour)
492            if self.config.request_trades {
493                let start = self.clock().utc_now() - ChronoDuration::hours(1);
494                if let Err(e) = self.request_trades(
495                    instrument_id,
496                    Some(start),
497                    None, // end: None means "now"
498                    None, // limit: None means use API default
499                    client_id,
500                    None, // params
501                ) {
502                    log::error!("Failed to request trades for {instrument_id}: {e}");
503                }
504            }
505
506            // Request historical funding rates (default to last 7 days)
507            if self.config.request_funding_rates {
508                let start = self.clock().utc_now() - ChronoDuration::days(7);
509                if let Err(e) = self.request_funding_rates(
510                    instrument_id,
511                    Some(start),
512                    None,
513                    None,
514                    client_id,
515                    None,
516                ) {
517                    log::error!("Failed to request funding rates for {instrument_id}: {e}");
518                }
519            }
520        }
521
522        // Subscribe to bars
523        if let Some(bar_types) = self.config.bar_types.clone() {
524            for bar_type in bar_types {
525                if self.config.subscribe_bars {
526                    self.subscribe_bars(bar_type, client_id, None);
527                }
528
529                // Request historical bars (default to last 1 hour)
530                if self.config.request_bars {
531                    let start = self.clock().utc_now() - ChronoDuration::hours(1);
532                    if let Err(e) = self.request_bars(
533                        bar_type,
534                        Some(start),
535                        None, // end: None means "now"
536                        None, // limit: None means use API default
537                        client_id,
538                        None, // params
539                    ) {
540                        log::error!("Failed to request bars for {bar_type}: {e}");
541                    }
542                }
543            }
544        }
545
546        // Set up stats timer
547        if stats_interval_secs > 0 {
548            self.clock().set_timer(
549                "STATS-TIMER",
550                Duration::from_secs(stats_interval_secs),
551                None,
552                None,
553                None,
554                Some(true),
555                Some(false),
556            )?;
557        }
558
559        Ok(())
560    }
561
562    fn on_stop(&mut self) -> anyhow::Result<()> {
563        if !self.config.can_unsubscribe {
564            return Ok(());
565        }
566
567        let instrument_ids = self.config.instrument_ids.clone();
568        let client_id = self.config.client_id;
569
570        for instrument_id in instrument_ids {
571            if self.config.subscribe_instrument {
572                self.unsubscribe_instrument(instrument_id, client_id, None);
573            }
574
575            if self.config.subscribe_book_deltas {
576                self.unsubscribe_book_deltas(instrument_id, client_id, None);
577            }
578
579            if self.config.subscribe_book_at_interval {
580                self.unsubscribe_book_at_interval(
581                    instrument_id,
582                    self.config.book_interval_ms,
583                    client_id,
584                    None,
585                );
586            }
587
588            // TODO: Support unsubscribe_book_depth when the method is available
589            // if self.config.subscribe_book_depth {
590            //     self.unsubscribe_book_depth(instrument_id, client_id, None);
591            // }
592
593            if self.config.subscribe_quotes {
594                self.unsubscribe_quotes(instrument_id, client_id, None);
595            }
596
597            if self.config.subscribe_trades {
598                self.unsubscribe_trades(instrument_id, client_id, None);
599            }
600
601            if self.config.subscribe_mark_prices {
602                self.unsubscribe_mark_prices(instrument_id, client_id, None);
603            }
604
605            if self.config.subscribe_index_prices {
606                self.unsubscribe_index_prices(instrument_id, client_id, None);
607            }
608
609            if self.config.subscribe_funding_rates {
610                self.unsubscribe_funding_rates(instrument_id, client_id, None);
611            }
612
613            if self.config.subscribe_instrument_status {
614                self.unsubscribe_instrument_status(instrument_id, client_id, None);
615            }
616
617            if self.config.subscribe_instrument_close {
618                self.unsubscribe_instrument_close(instrument_id, client_id, None);
619            }
620        }
621
622        if let Some(bar_types) = self.config.bar_types.clone() {
623            for bar_type in bar_types {
624                if self.config.subscribe_bars {
625                    self.unsubscribe_bars(bar_type, client_id, None);
626                }
627            }
628        }
629
630        Ok(())
631    }
632
633    fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
634        // Timer events are used by the actor but don't require specific handling
635        Ok(())
636    }
637
638    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
639        if self.config.log_data {
640            log_info!("{instrument:?}", color = LogColor::Cyan);
641        }
642        Ok(())
643    }
644
645    fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
646        if self.config.log_data {
647            let levels = self.config.book_levels_to_print;
648            let instrument_id = book.instrument_id;
649            let book_str = book.pprint(levels, None);
650            log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
651        }
652
653        Ok(())
654    }
655
656    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
657        if self.config.manage_book {
658            if let Some(book) = self.books.get_mut(&deltas.instrument_id) {
659                book.apply_deltas(deltas)?;
660
661                if self.config.log_data {
662                    let levels = self.config.book_levels_to_print;
663                    let instrument_id = deltas.instrument_id;
664                    let book_str = book.pprint(levels, None);
665                    log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
666                }
667            }
668        } else if self.config.log_data {
669            log_info!("{deltas:?}", color = LogColor::Cyan);
670        }
671        Ok(())
672    }
673
674    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
675        if self.config.log_data {
676            log_info!("{quote:?}", color = LogColor::Cyan);
677        }
678        Ok(())
679    }
680
681    fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
682        if self.config.log_data {
683            log_info!("{trade:?}", color = LogColor::Cyan);
684        }
685        Ok(())
686    }
687
688    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
689        if self.config.log_data {
690            log_info!("{bar:?}", color = LogColor::Cyan);
691        }
692        Ok(())
693    }
694
695    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
696        if self.config.log_data {
697            log_info!("{mark_price:?}", color = LogColor::Cyan);
698        }
699        Ok(())
700    }
701
702    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
703        if self.config.log_data {
704            log_info!("{index_price:?}", color = LogColor::Cyan);
705        }
706        Ok(())
707    }
708
709    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
710        if self.config.log_data {
711            log_info!("{funding_rate:?}", color = LogColor::Cyan);
712        }
713        Ok(())
714    }
715
716    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
717        if self.config.log_data {
718            log_info!("{data:?}", color = LogColor::Cyan);
719        }
720        Ok(())
721    }
722
723    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
724        if self.config.log_data {
725            log_info!("{update:?}", color = LogColor::Cyan);
726        }
727        Ok(())
728    }
729
730    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
731        if self.config.log_data {
732            log_info!(
733                "Received {} historical trades",
734                trades.len(),
735                color = LogColor::Cyan
736            );
737            for trade in trades.iter().take(5) {
738                log_info!("  {trade:?}", color = LogColor::Cyan);
739            }
740            if trades.len() > 5 {
741                log_info!(
742                    "  ... and {} more trades",
743                    trades.len() - 5,
744                    color = LogColor::Cyan
745                );
746            }
747        }
748        Ok(())
749    }
750
751    fn on_historical_funding_rates(
752        &mut self,
753        funding_rates: &[FundingRateUpdate],
754    ) -> anyhow::Result<()> {
755        if self.config.log_data {
756            log_info!(
757                "Received {} historical funding rates",
758                funding_rates.len(),
759                color = LogColor::Cyan
760            );
761            for rate in funding_rates.iter().take(5) {
762                log_info!("  {rate:?}", color = LogColor::Cyan);
763            }
764            if funding_rates.len() > 5 {
765                log_info!(
766                    "  ... and {} more funding rates",
767                    funding_rates.len() - 5,
768                    color = LogColor::Cyan
769                );
770            }
771        }
772        Ok(())
773    }
774
775    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
776        if self.config.log_data {
777            log_info!(
778                "Received {} historical bars",
779                bars.len(),
780                color = LogColor::Cyan
781            );
782            for bar in bars.iter().take(5) {
783                log_info!("  {bar:?}", color = LogColor::Cyan);
784            }
785            if bars.len() > 5 {
786                log_info!(
787                    "  ... and {} more bars",
788                    bars.len() - 5,
789                    color = LogColor::Cyan
790                );
791            }
792        }
793        Ok(())
794    }
795}
796
797impl DataTester {
798    /// Creates a new [`DataTester`] instance.
799    #[must_use]
800    pub fn new(config: DataTesterConfig) -> Self {
801        Self {
802            core: DataActorCore::new(config.base.clone()),
803            config,
804            books: AHashMap::new(),
805        }
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use nautilus_core::UnixNanos;
812    use nautilus_model::{
813        data::OrderBookDelta,
814        enums::{InstrumentCloseType, MarketStatusAction},
815        identifiers::Symbol,
816        instruments::CurrencyPair,
817        types::{Currency, Price, Quantity},
818    };
819    use rstest::*;
820    use rust_decimal::Decimal;
821
822    use super::*;
823
824    #[fixture]
825    fn config() -> DataTesterConfig {
826        let client_id = ClientId::new("TEST");
827        let instrument_ids = vec![
828            InstrumentId::from("BTC-USDT.TEST"),
829            InstrumentId::from("ETH-USDT.TEST"),
830        ];
831        DataTesterConfig::new(client_id, instrument_ids)
832            .with_subscribe_quotes(true)
833            .with_subscribe_trades(true)
834    }
835
836    #[rstest]
837    fn test_config_creation() {
838        let client_id = ClientId::new("TEST");
839        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
840        let config =
841            DataTesterConfig::new(client_id, instrument_ids.clone()).with_subscribe_quotes(true);
842
843        assert_eq!(config.client_id, Some(client_id));
844        assert_eq!(config.instrument_ids, instrument_ids);
845        assert!(config.subscribe_quotes);
846        assert!(!config.subscribe_trades);
847        assert!(config.log_data);
848        assert_eq!(config.stats_interval_secs, 5);
849    }
850
851    #[rstest]
852    fn test_config_default() {
853        let config = DataTesterConfig::default();
854
855        assert_eq!(config.client_id, None);
856        assert!(config.instrument_ids.is_empty());
857        assert!(!config.subscribe_quotes);
858        assert!(!config.subscribe_trades);
859        assert!(!config.subscribe_bars);
860        assert!(!config.request_instruments);
861        assert!(!config.request_book_snapshot);
862        assert!(!config.request_book_deltas);
863        assert!(!config.request_trades);
864        assert!(!config.request_bars);
865        assert!(!config.request_funding_rates);
866        assert!(config.can_unsubscribe);
867        assert!(config.log_data);
868    }
869
870    #[rstest]
871    fn test_actor_creation(config: DataTesterConfig) {
872        let actor = DataTester::new(config);
873
874        assert_eq!(actor.config.client_id, Some(ClientId::new("TEST")));
875        assert_eq!(actor.config.instrument_ids.len(), 2);
876    }
877
878    #[rstest]
879    fn test_on_quote_with_logging_enabled(config: DataTesterConfig) {
880        let mut actor = DataTester::new(config);
881
882        let quote = QuoteTick::default();
883        let result = actor.on_quote(&quote);
884
885        assert!(result.is_ok());
886    }
887
888    #[rstest]
889    fn test_on_quote_with_logging_disabled(mut config: DataTesterConfig) {
890        config.log_data = false;
891        let mut actor = DataTester::new(config);
892
893        let quote = QuoteTick::default();
894        let result = actor.on_quote(&quote);
895
896        assert!(result.is_ok());
897    }
898
899    #[rstest]
900    fn test_on_trade(config: DataTesterConfig) {
901        let mut actor = DataTester::new(config);
902
903        let trade = TradeTick::default();
904        let result = actor.on_trade(&trade);
905
906        assert!(result.is_ok());
907    }
908
909    #[rstest]
910    fn test_on_bar(config: DataTesterConfig) {
911        let mut actor = DataTester::new(config);
912
913        let bar = Bar::default();
914        let result = actor.on_bar(&bar);
915
916        assert!(result.is_ok());
917    }
918
919    #[rstest]
920    fn test_on_instrument(config: DataTesterConfig) {
921        let mut actor = DataTester::new(config);
922
923        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
924        let instrument = CurrencyPair::new(
925            instrument_id,
926            Symbol::from("BTC/USDT"),
927            Currency::USD(),
928            Currency::USD(),
929            4,
930            3,
931            Price::from("0.0001"),
932            Quantity::from("0.001"),
933            None,
934            None,
935            None,
936            None,
937            None,
938            None,
939            None,
940            None,
941            None,
942            None,
943            None,
944            None,
945            UnixNanos::default(),
946            UnixNanos::default(),
947        );
948        let result = actor.on_instrument(&InstrumentAny::CurrencyPair(instrument));
949
950        assert!(result.is_ok());
951    }
952
953    #[rstest]
954    fn test_on_book_deltas_without_managed_book(config: DataTesterConfig) {
955        let mut actor = DataTester::new(config);
956
957        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
958        let delta =
959            OrderBookDelta::clear(instrument_id, 0, UnixNanos::default(), UnixNanos::default());
960        let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
961        let result = actor.on_book_deltas(&deltas);
962
963        assert!(result.is_ok());
964    }
965
966    #[rstest]
967    fn test_on_mark_price(config: DataTesterConfig) {
968        let mut actor = DataTester::new(config);
969
970        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
971        let price = Price::from("50000.0");
972        let mark_price = MarkPriceUpdate::new(
973            instrument_id,
974            price,
975            UnixNanos::default(),
976            UnixNanos::default(),
977        );
978        let result = actor.on_mark_price(&mark_price);
979
980        assert!(result.is_ok());
981    }
982
983    #[rstest]
984    fn test_on_index_price(config: DataTesterConfig) {
985        let mut actor = DataTester::new(config);
986
987        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
988        let price = Price::from("50000.0");
989        let index_price = IndexPriceUpdate::new(
990            instrument_id,
991            price,
992            UnixNanos::default(),
993            UnixNanos::default(),
994        );
995        let result = actor.on_index_price(&index_price);
996
997        assert!(result.is_ok());
998    }
999
1000    #[rstest]
1001    fn test_on_funding_rate(config: DataTesterConfig) {
1002        let mut actor = DataTester::new(config);
1003
1004        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1005        let funding_rate = FundingRateUpdate::new(
1006            instrument_id,
1007            Decimal::new(1, 4),
1008            None,
1009            UnixNanos::default(),
1010            UnixNanos::default(),
1011        );
1012        let result = actor.on_funding_rate(&funding_rate);
1013
1014        assert!(result.is_ok());
1015    }
1016
1017    #[rstest]
1018    fn test_on_historical_funding_rates(config: DataTesterConfig) {
1019        let mut actor = DataTester::new(config);
1020
1021        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1022        let rates = vec![
1023            FundingRateUpdate::new(
1024                instrument_id,
1025                Decimal::new(1, 4),
1026                None,
1027                UnixNanos::default(),
1028                UnixNanos::default(),
1029            ),
1030            FundingRateUpdate::new(
1031                instrument_id,
1032                Decimal::new(2, 4),
1033                None,
1034                UnixNanos::default(),
1035                UnixNanos::default(),
1036            ),
1037        ];
1038        let result = actor.on_historical_funding_rates(&rates);
1039
1040        assert!(result.is_ok());
1041    }
1042
1043    #[rstest]
1044    fn test_config_request_funding_rates() {
1045        let client_id = ClientId::new("TEST");
1046        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
1047        let config =
1048            DataTesterConfig::new(client_id, instrument_ids).with_request_funding_rates(true);
1049
1050        assert!(config.request_funding_rates);
1051    }
1052
1053    #[rstest]
1054    fn test_config_request_book_deltas() {
1055        let client_id = ClientId::new("TEST");
1056        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
1057        let config =
1058            DataTesterConfig::new(client_id, instrument_ids).with_request_book_deltas(true);
1059
1060        assert!(config.request_book_deltas);
1061    }
1062
1063    #[rstest]
1064    fn test_on_instrument_status(config: DataTesterConfig) {
1065        let mut actor = DataTester::new(config);
1066
1067        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1068        let status = InstrumentStatus::new(
1069            instrument_id,
1070            MarketStatusAction::Trading,
1071            UnixNanos::default(),
1072            UnixNanos::default(),
1073            None,
1074            None,
1075            None,
1076            None,
1077            None,
1078        );
1079        let result = actor.on_instrument_status(&status);
1080
1081        assert!(result.is_ok());
1082    }
1083
1084    #[rstest]
1085    fn test_on_instrument_close(config: DataTesterConfig) {
1086        let mut actor = DataTester::new(config);
1087
1088        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1089        let price = Price::from("50000.0");
1090        let close = InstrumentClose::new(
1091            instrument_id,
1092            price,
1093            InstrumentCloseType::EndOfSession,
1094            UnixNanos::default(),
1095            UnixNanos::default(),
1096        );
1097        let result = actor.on_instrument_close(&close);
1098
1099        assert!(result.is_ok());
1100    }
1101
1102    #[rstest]
1103    fn test_on_time_event(config: DataTesterConfig) {
1104        let mut actor = DataTester::new(config);
1105
1106        let event = TimeEvent::new(
1107            "TEST".into(),
1108            Default::default(),
1109            UnixNanos::default(),
1110            UnixNanos::default(),
1111        );
1112        let result = actor.on_time_event(&event);
1113
1114        assert!(result.is_ok());
1115    }
1116
1117    #[rstest]
1118    fn test_config_with_all_subscriptions_enabled(mut config: DataTesterConfig) {
1119        config.subscribe_book_deltas = true;
1120        config.subscribe_book_at_interval = true;
1121        config.subscribe_bars = true;
1122        config.subscribe_mark_prices = true;
1123        config.subscribe_index_prices = true;
1124        config.subscribe_funding_rates = true;
1125        config.subscribe_instrument = true;
1126        config.subscribe_instrument_status = true;
1127        config.subscribe_instrument_close = true;
1128
1129        let actor = DataTester::new(config);
1130
1131        assert!(actor.config.subscribe_book_deltas);
1132        assert!(actor.config.subscribe_book_at_interval);
1133        assert!(actor.config.subscribe_bars);
1134        assert!(actor.config.subscribe_mark_prices);
1135        assert!(actor.config.subscribe_index_prices);
1136        assert!(actor.config.subscribe_funding_rates);
1137        assert!(actor.config.subscribe_instrument);
1138        assert!(actor.config.subscribe_instrument_status);
1139        assert!(actor.config.subscribe_instrument_close);
1140    }
1141
1142    #[rstest]
1143    fn test_config_with_book_management(mut config: DataTesterConfig) {
1144        config.manage_book = true;
1145        config.book_levels_to_print = 5;
1146
1147        let actor = DataTester::new(config);
1148
1149        assert!(actor.config.manage_book);
1150        assert_eq!(actor.config.book_levels_to_print, 5);
1151        assert!(actor.books.is_empty());
1152    }
1153
1154    #[rstest]
1155    fn test_config_with_custom_stats_interval(mut config: DataTesterConfig) {
1156        config.stats_interval_secs = 10;
1157
1158        let actor = DataTester::new(config);
1159
1160        assert_eq!(actor.config.stats_interval_secs, 10);
1161    }
1162
1163    #[rstest]
1164    fn test_config_with_unsubscribe_disabled(mut config: DataTesterConfig) {
1165        config.can_unsubscribe = false;
1166
1167        let actor = DataTester::new(config);
1168
1169        assert!(!actor.config.can_unsubscribe);
1170    }
1171}