nautilus_databento/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    env, fs,
18    path::{Path, PathBuf},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use databento::dbn::{self, InstrumentDefMsg};
24use dbn::{
25    Publisher,
26    decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32    identifiers::{InstrumentId, Symbol, Venue},
33    instruments::InstrumentAny,
34    types::Currency,
35};
36
37use super::{
38    decode::{decode_imbalance_msg, decode_record, decode_statistics_msg, decode_status_msg},
39    symbology::decode_nautilus_instrument_id,
40    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
41};
42use crate::{decode::decode_instrument_def_msg, symbology::MetadataCache};
43
44/// A Nautilus data loader for Databento Binary Encoding (DBN) format data.
45///
46/// # Supported schemas:
47///  - `MBO` -> `OrderBookDelta`
48///  - `MBP_1` -> `(QuoteTick, Option<TradeTick>)`
49///  - `MBP_10` -> `OrderBookDepth10`
50///  - `BBO_1S` -> `QuoteTick`
51///  - `BBO_1M` -> `QuoteTick`
52///  - `CMBP_1` -> `(QuoteTick, Option<TradeTick>)`
53///  - `CBBO_1S` -> `QuoteTick`
54///  - `CBBO_1M` -> `QuoteTick`
55///  - `TCBBO` -> `(QuoteTick, TradeTick)`
56///  - `TBBO` -> `(QuoteTick, TradeTick)`
57///  - `TRADES` -> `TradeTick`
58///  - `OHLCV_1S` -> `Bar`
59///  - `OHLCV_1M` -> `Bar`
60///  - `OHLCV_1H` -> `Bar`
61///  - `OHLCV_1D` -> `Bar`
62///  - `OHLCV_EOD` -> `Bar`
63///  - `DEFINITION` -> `Instrument`
64///  - `IMBALANCE` -> `DatabentoImbalance`
65///  - `STATISTICS` -> `DatabentoStatistics`
66///  - `STATUS` -> `InstrumentStatus`
67///
68/// # References
69///
70/// <https://databento.com/docs/schemas-and-data-formats>
71#[cfg_attr(
72    feature = "python",
73    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
74)]
75#[derive(Debug)]
76pub struct DatabentoDataLoader {
77    publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
78    venue_dataset_map: IndexMap<Venue, Dataset>,
79    publisher_venue_map: IndexMap<PublisherId, Venue>,
80    symbol_venue_map: AHashMap<Symbol, Venue>,
81}
82
83impl DatabentoDataLoader {
84    /// Creates a new [`DatabentoDataLoader`] instance.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if locating or loading publishers data fails.
89    pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
90        let mut loader = Self {
91            publishers_map: IndexMap::new(),
92            venue_dataset_map: IndexMap::new(),
93            publisher_venue_map: IndexMap::new(),
94            symbol_venue_map: AHashMap::new(),
95        };
96
97        // Load publishers
98        let publishers_filepath = if let Some(p) = publishers_filepath {
99            p
100        } else {
101            // Use built-in publishers path
102            let mut exe_path = env::current_exe()?;
103            exe_path.pop();
104            exe_path.push("publishers.json");
105            exe_path
106        };
107
108        loader
109            .load_publishers(publishers_filepath)
110            .context("Error loading publishers.json")?;
111
112        Ok(loader)
113    }
114
115    /// Load the publishers data from the file at the given `filepath`.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the file cannot be read or parsed as JSON.
120    pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
121        let file_content = fs::read_to_string(filepath)?;
122        let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
123
124        self.publishers_map = publishers
125            .clone()
126            .into_iter()
127            .map(|p| (p.publisher_id, p))
128            .collect();
129
130        let mut venue_dataset_map = IndexMap::new();
131
132        // Only insert a dataset if the venue key is not already in the map
133        for publisher in &publishers {
134            let venue = Venue::from(publisher.venue.as_str());
135            let dataset = Dataset::from(publisher.dataset.as_str());
136            venue_dataset_map.entry(venue).or_insert(dataset);
137        }
138
139        self.venue_dataset_map = venue_dataset_map;
140
141        // Insert CME Globex exchanges
142        let glbx = Dataset::from("GLBX.MDP3");
143        self.venue_dataset_map.insert(Venue::CBCM(), glbx);
144        self.venue_dataset_map.insert(Venue::GLBX(), glbx);
145        self.venue_dataset_map.insert(Venue::NYUM(), glbx);
146        self.venue_dataset_map.insert(Venue::XCBT(), glbx);
147        self.venue_dataset_map.insert(Venue::XCEC(), glbx);
148        self.venue_dataset_map.insert(Venue::XCME(), glbx);
149        self.venue_dataset_map.insert(Venue::XFXS(), glbx);
150        self.venue_dataset_map.insert(Venue::XNYM(), glbx);
151
152        self.publisher_venue_map = publishers
153            .into_iter()
154            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
155            .collect();
156
157        Ok(())
158    }
159
160    /// Returns the internal Databento publishers currently held by the loader.
161    #[must_use]
162    pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
163        &self.publishers_map
164    }
165
166    /// Sets the `venue` to map to the given `dataset`.
167    pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
168        _ = self.venue_dataset_map.insert(venue, dataset);
169    }
170
171    /// Returns the dataset which matches the given `venue` (if found).
172    #[must_use]
173    pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
174        self.venue_dataset_map.get(venue)
175    }
176
177    /// Returns the venue which matches the given `publisher_id` (if found).
178    #[must_use]
179    pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
180        self.publisher_venue_map.get(&publisher_id)
181    }
182
183    /// Returns the schema for the given `filepath`.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if the file cannot be decoded or metadata retrieval fails.
188    pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
189        let decoder = Decoder::from_zstd_file(filepath)?;
190        let metadata = decoder.metadata();
191        Ok(metadata.schema.map(|schema| schema.to_string()))
192    }
193
194    /// Reads instrument definition records from a DBN file.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if decoding the definition records fails.
199    pub fn read_definition_records(
200        &mut self,
201        filepath: &Path,
202        use_exchange_as_venue: bool,
203    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
204        let decoder = Decoder::from_zstd_file(filepath)?;
205        let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsg>();
206
207        Ok(std::iter::from_fn(move || {
208            let result: anyhow::Result<Option<InstrumentAny>> = (|| {
209                dbn_stream
210                    .advance()
211                    .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
212
213                if let Some(rec) = dbn_stream.get() {
214                    let record = dbn::RecordRef::from(rec);
215                    let msg = record
216                        .get::<InstrumentDefMsg>()
217                        .ok_or_else(|| anyhow::anyhow!("Failed to decode InstrumentDefMsg"))?;
218
219                    // Symbol and venue resolution
220                    let raw_symbol = rec
221                        .raw_symbol()
222                        .map_err(|e| anyhow::anyhow!("Error decoding `raw_symbol`: {e}"))?;
223                    let symbol = Symbol::from(raw_symbol);
224
225                    let publisher = rec
226                        .hd
227                        .publisher()
228                        .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
229                    let venue = match publisher {
230                        Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
231                            let exchange = rec.exchange().map_err(|e| {
232                                anyhow::anyhow!("Missing `exchange` for record: {e}")
233                            })?;
234                            let venue = Venue::from_code(exchange).map_err(|e| {
235                                anyhow::anyhow!("Venue not found for exchange {exchange}: {e}")
236                            })?;
237                            self.symbol_venue_map.insert(symbol, venue);
238                            venue
239                        }
240                        _ => *self
241                            .publisher_venue_map
242                            .get(&msg.hd.publisher_id)
243                            .ok_or_else(|| {
244                                anyhow::anyhow!(
245                                    "Venue not found for publisher_id {}",
246                                    msg.hd.publisher_id
247                                )
248                            })?,
249                    };
250                    let instrument_id = InstrumentId::new(symbol, venue);
251                    let ts_init = msg.ts_recv.into();
252
253                    let data = decode_instrument_def_msg(rec, instrument_id, Some(ts_init))?;
254                    Ok(Some(data))
255                } else {
256                    // No more records
257                    Ok(None)
258                }
259            })();
260
261            match result {
262                Ok(Some(item)) => Some(Ok(item)),
263                Ok(None) => None,
264                Err(e) => Some(Err(e)),
265            }
266        }))
267    }
268
269    /// Reads and decodes market data records from a DBN file.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if reading records fails.
274    pub fn read_records<T>(
275        &self,
276        filepath: &Path,
277        instrument_id: Option<InstrumentId>,
278        price_precision: Option<u8>,
279        include_trades: bool,
280        bars_timestamp_on_close: Option<bool>,
281    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
282    where
283        T: dbn::Record + dbn::HasRType + 'static,
284    {
285        let decoder = Decoder::from_zstd_file(filepath)?;
286        let metadata = decoder.metadata().clone();
287        let mut metadata_cache = MetadataCache::new(metadata);
288        let mut dbn_stream = decoder.decode_stream::<T>();
289
290        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
291
292        Ok(std::iter::from_fn(move || {
293            let result: anyhow::Result<Option<(Option<Data>, Option<Data>)>> = (|| {
294                dbn_stream
295                    .advance()
296                    .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
297                if let Some(rec) = dbn_stream.get() {
298                    let record = dbn::RecordRef::from(rec);
299                    let instrument_id = if let Some(id) = &instrument_id {
300                        *id
301                    } else {
302                        decode_nautilus_instrument_id(
303                            &record,
304                            &mut metadata_cache,
305                            &self.publisher_venue_map,
306                            &self.symbol_venue_map,
307                        )
308                        .context("Failed to decode instrument id")?
309                    };
310                    let (item1, item2) = decode_record(
311                        &record,
312                        instrument_id,
313                        price_precision,
314                        None,
315                        include_trades,
316                        bars_timestamp_on_close.unwrap_or(true),
317                    )?;
318                    Ok(Some((item1, item2)))
319                } else {
320                    Ok(None)
321                }
322            })();
323            match result {
324                Ok(Some(v)) => Some(Ok(v)),
325                Ok(None) => None,
326                Err(e) => Some(Err(e)),
327            }
328        }))
329    }
330
331    /// Loads all instrument definitions from a DBN file.
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if loading instruments fails.
336    pub fn load_instruments(
337        &mut self,
338        filepath: &Path,
339        use_exchange_as_venue: bool,
340    ) -> anyhow::Result<Vec<InstrumentAny>> {
341        self.read_definition_records(filepath, use_exchange_as_venue)?
342            .collect::<Result<Vec<_>, _>>()
343    }
344
345    /// Loads order book delta messages from a DBN MBO schema file.
346    ///
347    /// Cannot include trades.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if loading order book deltas fails.
352    pub fn load_order_book_deltas(
353        &self,
354        filepath: &Path,
355        instrument_id: Option<InstrumentId>,
356        price_precision: Option<u8>,
357    ) -> anyhow::Result<Vec<OrderBookDelta>> {
358        self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false, None)?
359            .filter_map(|result| match result {
360                Ok((Some(item1), _)) => {
361                    if let Data::Delta(delta) = item1 {
362                        Some(Ok(delta))
363                    } else {
364                        None
365                    }
366                }
367                Ok((None, _)) => None,
368                Err(e) => Some(Err(e)),
369            })
370            .collect()
371    }
372
373    /// Loads order book depth10 snapshots from a DBN MBP-10 schema file.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if loading order book depth10 fails.
378    pub fn load_order_book_depth10(
379        &self,
380        filepath: &Path,
381        instrument_id: Option<InstrumentId>,
382        price_precision: Option<u8>,
383    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
384        self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false, None)?
385            .filter_map(|result| match result {
386                Ok((Some(item1), _)) => {
387                    if let Data::Depth10(depth) = item1 {
388                        Some(Ok(*depth))
389                    } else {
390                        None
391                    }
392                }
393                Ok((None, _)) => None,
394                Err(e) => Some(Err(e)),
395            })
396            .collect()
397    }
398
399    /// Loads quote tick messages from a DBN MBP-1 or TBBO schema file.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if loading quotes fails.
404    pub fn load_quotes(
405        &self,
406        filepath: &Path,
407        instrument_id: Option<InstrumentId>,
408        price_precision: Option<u8>,
409    ) -> anyhow::Result<Vec<QuoteTick>> {
410        self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false, None)?
411            .filter_map(|result| match result {
412                Ok((Some(item1), _)) => {
413                    if let Data::Quote(quote) = item1 {
414                        Some(Ok(quote))
415                    } else {
416                        None
417                    }
418                }
419                Ok((None, _)) => None,
420                Err(e) => Some(Err(e)),
421            })
422            .collect()
423    }
424
425    /// Loads best bid/offer quote messages from a DBN BBO schema file.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if loading BBO quotes fails.
430    pub fn load_bbo_quotes(
431        &self,
432        filepath: &Path,
433        instrument_id: Option<InstrumentId>,
434        price_precision: Option<u8>,
435    ) -> anyhow::Result<Vec<QuoteTick>> {
436        self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false, None)?
437            .filter_map(|result| match result {
438                Ok((Some(item1), _)) => {
439                    if let Data::Quote(quote) = item1 {
440                        Some(Ok(quote))
441                    } else {
442                        None
443                    }
444                }
445                Ok((None, _)) => None,
446                Err(e) => Some(Err(e)),
447            })
448            .collect()
449    }
450
451    /// Loads consolidated MBP-1 quote messages from a DBN CMBP-1 schema file.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if loading consolidated MBP-1 quotes fails.
456    pub fn load_cmbp_quotes(
457        &self,
458        filepath: &Path,
459        instrument_id: Option<InstrumentId>,
460        price_precision: Option<u8>,
461    ) -> anyhow::Result<Vec<QuoteTick>> {
462        self.read_records::<dbn::Cmbp1Msg>(filepath, instrument_id, price_precision, false, None)?
463            .filter_map(|result| match result {
464                Ok((Some(item1), _)) => {
465                    if let Data::Quote(quote) = item1 {
466                        Some(Ok(quote))
467                    } else {
468                        None
469                    }
470                }
471                Ok((None, _)) => None,
472                Err(e) => Some(Err(e)),
473            })
474            .collect()
475    }
476
477    /// Loads consolidated best bid/offer quote messages from a DBN CBBO schema file.
478    ///
479    /// # Errors
480    ///
481    /// Returns an error if loading consolidated BBO quotes fails.
482    pub fn load_cbbo_quotes(
483        &self,
484        filepath: &Path,
485        instrument_id: Option<InstrumentId>,
486        price_precision: Option<u8>,
487    ) -> anyhow::Result<Vec<QuoteTick>> {
488        self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
489            .filter_map(|result| match result {
490                Ok((Some(item1), _)) => {
491                    if let Data::Quote(quote) = item1 {
492                        Some(Ok(quote))
493                    } else {
494                        None
495                    }
496                }
497                Ok((None, _)) => None,
498                Err(e) => Some(Err(e)),
499            })
500            .collect()
501    }
502
503    /// Loads trade messages from a DBN TBBO schema file.
504    ///
505    /// # Errors
506    ///
507    /// Returns an error if loading TBBO trades fails.
508    pub fn load_tbbo_trades(
509        &self,
510        filepath: &Path,
511        instrument_id: Option<InstrumentId>,
512        price_precision: Option<u8>,
513    ) -> anyhow::Result<Vec<TradeTick>> {
514        self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false, None)?
515            .filter_map(|result| match result {
516                Ok((_, maybe_item2)) => {
517                    if let Some(Data::Trade(trade)) = maybe_item2 {
518                        Some(Ok(trade))
519                    } else {
520                        None
521                    }
522                }
523                Err(e) => Some(Err(e)),
524            })
525            .collect()
526    }
527
528    /// Loads trade messages from a DBN TCBBO schema file.
529    ///
530    /// # Errors
531    ///
532    /// Returns an error if loading TCBBO trades fails.
533    pub fn load_tcbbo_trades(
534        &self,
535        filepath: &Path,
536        instrument_id: Option<InstrumentId>,
537        price_precision: Option<u8>,
538    ) -> anyhow::Result<Vec<TradeTick>> {
539        self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
540            .filter_map(|result| match result {
541                Ok((_, maybe_item2)) => {
542                    if let Some(Data::Trade(trade)) = maybe_item2 {
543                        Some(Ok(trade))
544                    } else {
545                        None
546                    }
547                }
548                Err(e) => Some(Err(e)),
549            })
550            .collect()
551    }
552
553    /// Loads trade messages from a DBN TRADES schema file.
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if loading trades fails.
558    pub fn load_trades(
559        &self,
560        filepath: &Path,
561        instrument_id: Option<InstrumentId>,
562        price_precision: Option<u8>,
563    ) -> anyhow::Result<Vec<TradeTick>> {
564        self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false, None)?
565            .filter_map(|result| match result {
566                Ok((Some(item1), _)) => {
567                    if let Data::Trade(trade) = item1 {
568                        Some(Ok(trade))
569                    } else {
570                        None
571                    }
572                }
573                Ok((None, _)) => None,
574                Err(e) => Some(Err(e)),
575            })
576            .collect()
577    }
578
579    /// Loads OHLCV bar messages from a DBN OHLCV schema file.
580    ///
581    /// # Errors
582    ///
583    /// Returns an error if loading bars fails.
584    pub fn load_bars(
585        &self,
586        filepath: &Path,
587        instrument_id: Option<InstrumentId>,
588        price_precision: Option<u8>,
589        timestamp_on_close: Option<bool>,
590    ) -> anyhow::Result<Vec<Bar>> {
591        self.read_records::<dbn::OhlcvMsg>(
592            filepath,
593            instrument_id,
594            price_precision,
595            false,
596            timestamp_on_close,
597        )?
598        .filter_map(|result| match result {
599            Ok((Some(item1), _)) => {
600                if let Data::Bar(bar) = item1 {
601                    Some(Ok(bar))
602                } else {
603                    None
604                }
605            }
606            Ok((None, _)) => None,
607            Err(e) => Some(Err(e)),
608        })
609        .collect()
610    }
611
612    /// Loads instrument status messages from a DBN STATUS schema file.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if loading status records fails.
617    pub fn load_status_records<T>(
618        &self,
619        filepath: &Path,
620        instrument_id: Option<InstrumentId>,
621    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
622    where
623        T: dbn::Record + dbn::HasRType + 'static,
624    {
625        let decoder = Decoder::from_zstd_file(filepath)?;
626        let metadata = decoder.metadata().clone();
627        let mut metadata_cache = MetadataCache::new(metadata);
628        let mut dbn_stream = decoder.decode_stream::<T>();
629
630        Ok(std::iter::from_fn(move || {
631            if let Err(e) = dbn_stream.advance() {
632                return Some(Err(e.into()));
633            }
634            match dbn_stream.get() {
635                Some(rec) => {
636                    let record = dbn::RecordRef::from(rec);
637                    let instrument_id = match &instrument_id {
638                        Some(id) => *id, // Copy
639                        None => match decode_nautilus_instrument_id(
640                            &record,
641                            &mut metadata_cache,
642                            &self.publisher_venue_map,
643                            &self.symbol_venue_map,
644                        ) {
645                            Ok(id) => id,
646                            Err(e) => return Some(Err(e)),
647                        },
648                    };
649
650                    let msg = match record.get::<dbn::StatusMsg>() {
651                        Some(m) => m,
652                        None => return Some(Err(anyhow::anyhow!("Invalid `StatusMsg`"))),
653                    };
654                    let ts_init = msg.ts_recv.into();
655
656                    match decode_status_msg(msg, instrument_id, Some(ts_init)) {
657                        Ok(data) => Some(Ok(data)),
658                        Err(e) => Some(Err(e)),
659                    }
660                }
661                None => None,
662            }
663        }))
664    }
665
666    /// Reads imbalance messages from a DBN IMBALANCE schema file.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if reading imbalance records fails.
671    pub fn read_imbalance_records<T>(
672        &self,
673        filepath: &Path,
674        instrument_id: Option<InstrumentId>,
675        price_precision: Option<u8>,
676    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
677    where
678        T: dbn::Record + dbn::HasRType + 'static,
679    {
680        let decoder = Decoder::from_zstd_file(filepath)?;
681        let metadata = decoder.metadata().clone();
682        let mut metadata_cache = MetadataCache::new(metadata);
683        let mut dbn_stream = decoder.decode_stream::<T>();
684
685        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
686
687        Ok(std::iter::from_fn(move || {
688            if let Err(e) = dbn_stream.advance() {
689                return Some(Err(e.into()));
690            }
691            match dbn_stream.get() {
692                Some(rec) => {
693                    let record = dbn::RecordRef::from(rec);
694                    let instrument_id = match &instrument_id {
695                        Some(id) => *id, // Copy
696                        None => match decode_nautilus_instrument_id(
697                            &record,
698                            &mut metadata_cache,
699                            &self.publisher_venue_map,
700                            &self.symbol_venue_map,
701                        ) {
702                            Ok(id) => id,
703                            Err(e) => return Some(Err(e)),
704                        },
705                    };
706
707                    let msg = match record.get::<dbn::ImbalanceMsg>() {
708                        Some(m) => m,
709                        None => return Some(Err(anyhow::anyhow!("Invalid `ImbalanceMsg`"))),
710                    };
711                    let ts_init = msg.ts_recv.into();
712
713                    match decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init)) {
714                        Ok(data) => Some(Ok(data)),
715                        Err(e) => Some(Err(e)),
716                    }
717                }
718                None => None,
719            }
720        }))
721    }
722
723    /// Reads statistics messages from a DBN STATISTICS schema file.
724    ///
725    /// # Errors
726    ///
727    /// Returns an error if reading statistics records fails.
728    pub fn read_statistics_records<T>(
729        &self,
730        filepath: &Path,
731        instrument_id: Option<InstrumentId>,
732        price_precision: Option<u8>,
733    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
734    where
735        T: dbn::Record + dbn::HasRType + 'static,
736    {
737        let decoder = Decoder::from_zstd_file(filepath)?;
738        let metadata = decoder.metadata().clone();
739        let mut metadata_cache = MetadataCache::new(metadata);
740        let mut dbn_stream = decoder.decode_stream::<T>();
741
742        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
743
744        Ok(std::iter::from_fn(move || {
745            if let Err(e) = dbn_stream.advance() {
746                return Some(Err(e.into()));
747            }
748            match dbn_stream.get() {
749                Some(rec) => {
750                    let record = dbn::RecordRef::from(rec);
751                    let instrument_id = match &instrument_id {
752                        Some(id) => *id, // Copy
753                        None => match decode_nautilus_instrument_id(
754                            &record,
755                            &mut metadata_cache,
756                            &self.publisher_venue_map,
757                            &self.symbol_venue_map,
758                        ) {
759                            Ok(id) => id,
760                            Err(e) => return Some(Err(e)),
761                        },
762                    };
763                    let msg = match record.get::<dbn::StatMsg>() {
764                        Some(m) => m,
765                        None => return Some(Err(anyhow::anyhow!("Invalid `StatMsg`"))),
766                    };
767                    let ts_init = msg.ts_recv.into();
768
769                    match decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
770                    {
771                        Ok(data) => Some(Ok(data)),
772                        Err(e) => Some(Err(e)),
773                    }
774                }
775                None => None,
776            }
777        }))
778    }
779}
780
781////////////////////////////////////////////////////////////////////////////////
782// Tests
783////////////////////////////////////////////////////////////////////////////////
784#[cfg(test)]
785mod tests {
786    use std::path::{Path, PathBuf};
787
788    use nautilus_model::types::{Price, Quantity};
789    use rstest::{fixture, rstest};
790    use ustr::Ustr;
791
792    use super::*;
793
794    fn test_data_path() -> PathBuf {
795        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
796    }
797
798    #[fixture]
799    fn loader() -> DatabentoDataLoader {
800        let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
801        DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
802    }
803
804    // TODO: Improve the below assertions that we've actually read the records we expected
805
806    #[rstest]
807    fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
808        let dataset = Ustr::from("EQUS.PLUS");
809        let venue = Venue::from("XNAS");
810        loader.set_dataset_for_venue(dataset, venue);
811
812        let result = loader.get_dataset_for_venue(&venue).unwrap();
813        assert_eq!(*result, dataset);
814    }
815
816    #[rstest]
817    #[case(test_data_path().join("test_data.definition.dbn.zst"))]
818    fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
819        let instruments = loader.load_instruments(&path, false).unwrap();
820
821        assert_eq!(instruments.len(), 2);
822    }
823
824    #[rstest]
825    fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
826        let path = test_data_path().join("test_data.mbo.dbn.zst");
827        let instrument_id = InstrumentId::from("ESM4.GLBX");
828
829        let deltas = loader
830            .load_order_book_deltas(&path, Some(instrument_id), None)
831            .unwrap();
832
833        assert_eq!(deltas.len(), 2);
834    }
835
836    #[rstest]
837    fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
838        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
839        let instrument_id = InstrumentId::from("ESM4.GLBX");
840
841        let depths = loader
842            .load_order_book_depth10(&path, Some(instrument_id), None)
843            .unwrap();
844
845        assert_eq!(depths.len(), 2);
846    }
847
848    #[rstest]
849    fn test_load_quotes(loader: DatabentoDataLoader) {
850        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
851        let instrument_id = InstrumentId::from("ESM4.GLBX");
852
853        let quotes = loader
854            .load_quotes(&path, Some(instrument_id), None)
855            .unwrap();
856
857        assert_eq!(quotes.len(), 2);
858    }
859
860    #[rstest]
861    #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
862    #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
863    fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
864        let instrument_id = InstrumentId::from("ESM4.GLBX");
865
866        let quotes = loader
867            .load_bbo_quotes(&path, Some(instrument_id), None)
868            .unwrap();
869
870        assert_eq!(quotes.len(), 4);
871    }
872
873    #[rstest]
874    fn test_load_cmbp_quotes(loader: DatabentoDataLoader) {
875        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
876        let instrument_id = InstrumentId::from("ESM4.GLBX");
877
878        let quotes = loader
879            .load_cmbp_quotes(&path, Some(instrument_id), None)
880            .unwrap();
881
882        // Verify exact data count
883        assert_eq!(quotes.len(), 2);
884
885        // Verify first quote fields
886        let first_quote = &quotes[0];
887        assert_eq!(first_quote.instrument_id, instrument_id);
888        assert_eq!(first_quote.bid_price, Price::from("3720.25"));
889        assert_eq!(first_quote.ask_price, Price::from("3720.50"));
890        assert_eq!(first_quote.bid_size, Quantity::from(24));
891        assert_eq!(first_quote.ask_size, Quantity::from(11));
892        assert_eq!(first_quote.ts_event, 1609160400006136329);
893        assert_eq!(first_quote.ts_init, 1609160400006136329);
894    }
895
896    #[rstest]
897    fn test_load_cbbo_quotes(loader: DatabentoDataLoader) {
898        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
899        let instrument_id = InstrumentId::from("ESM4.GLBX");
900
901        let quotes = loader
902            .load_cbbo_quotes(&path, Some(instrument_id), None)
903            .unwrap();
904
905        // Verify exact data count
906        assert_eq!(quotes.len(), 2);
907
908        // Verify first quote fields
909        let first_quote = &quotes[0];
910        assert_eq!(first_quote.instrument_id, instrument_id);
911        assert_eq!(first_quote.bid_price, Price::from("3720.25"));
912        assert_eq!(first_quote.ask_price, Price::from("3720.50"));
913        assert_eq!(first_quote.bid_size, Quantity::from(24));
914        assert_eq!(first_quote.ask_size, Quantity::from(11));
915        assert_eq!(first_quote.ts_event, 1609160400006136329);
916        assert_eq!(first_quote.ts_init, 1609160400006136329);
917    }
918
919    #[rstest]
920    fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
921        let path = test_data_path().join("test_data.tbbo.dbn.zst");
922        let instrument_id = InstrumentId::from("ESM4.GLBX");
923
924        let trades = loader
925            .load_tbbo_trades(&path, Some(instrument_id), None)
926            .unwrap();
927
928        // TBBO test data doesn't contain valid trade data (size/price may be 0)
929        assert_eq!(trades.len(), 0);
930    }
931
932    #[rstest]
933    fn test_load_tcbbo_trades(loader: DatabentoDataLoader) {
934        // Since we don't have dedicated TCBBO test data, we'll use CBBO data
935        // In practice, TCBBO would be CBBO messages with trade data
936        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
937        let instrument_id = InstrumentId::from("ESM4.GLBX");
938
939        let result = loader.load_tcbbo_trades(&path, Some(instrument_id), None);
940
941        assert!(result.is_ok());
942        let trades = result.unwrap();
943        assert_eq!(trades.len(), 2);
944    }
945
946    #[rstest]
947    fn test_load_trades(loader: DatabentoDataLoader) {
948        let path = test_data_path().join("test_data.trades.dbn.zst");
949        let instrument_id = InstrumentId::from("ESM4.GLBX");
950        let trades = loader
951            .load_trades(&path, Some(instrument_id), None)
952            .unwrap();
953
954        assert_eq!(trades.len(), 2);
955    }
956
957    #[rstest]
958    // #[case(test_data_path().join("test_data.ohlcv-1d.dbn.zst"))]  // TODO: Empty file (0 records)
959    #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
960    #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
961    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
962    fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
963        let instrument_id = InstrumentId::from("ESM4.GLBX");
964        let bars = loader
965            .load_bars(&path, Some(instrument_id), None, None)
966            .unwrap();
967
968        assert_eq!(bars.len(), 2);
969    }
970
971    #[rstest]
972    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
973    fn test_load_bars_timestamp_on_close_true(loader: DatabentoDataLoader, #[case] path: PathBuf) {
974        let instrument_id = InstrumentId::from("ESM4.GLBX");
975        let bars = loader
976            .load_bars(&path, Some(instrument_id), None, Some(true))
977            .unwrap();
978
979        assert_eq!(bars.len(), 2);
980
981        // When bars_timestamp_on_close is true, both ts_event and ts_init should be close time
982        for bar in &bars {
983            assert_eq!(
984                bar.ts_event, bar.ts_init,
985                "ts_event and ts_init should both be close time when bars_timestamp_on_close=true"
986            );
987        }
988    }
989
990    #[rstest]
991    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
992    fn test_load_bars_timestamp_on_close_false(loader: DatabentoDataLoader, #[case] path: PathBuf) {
993        let instrument_id = InstrumentId::from("ESM4.GLBX");
994        let bars = loader
995            .load_bars(&path, Some(instrument_id), None, Some(false))
996            .unwrap();
997
998        assert_eq!(bars.len(), 2);
999
1000        // When bars_timestamp_on_close is false, ts_event is open time and ts_init is close time
1001        for bar in &bars {
1002            assert_ne!(
1003                bar.ts_event, bar.ts_init,
1004                "ts_event should be open time and ts_init should be close time when bars_timestamp_on_close=false"
1005            );
1006            // For 1-second bars, ts_init (close) should be 1 second after ts_event (open)
1007            assert_eq!(bar.ts_init.as_u64(), bar.ts_event.as_u64() + 1_000_000_000);
1008        }
1009    }
1010
1011    #[rstest]
1012    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 0)]
1013    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 1)]
1014    fn test_load_bars_timestamp_comparison(
1015        loader: DatabentoDataLoader,
1016        #[case] path: PathBuf,
1017        #[case] bar_index: usize,
1018    ) {
1019        let instrument_id = InstrumentId::from("ESM4.GLBX");
1020
1021        let bars_close = loader
1022            .load_bars(&path, Some(instrument_id), None, Some(true))
1023            .unwrap();
1024
1025        let bars_open = loader
1026            .load_bars(&path, Some(instrument_id), None, Some(false))
1027            .unwrap();
1028
1029        assert_eq!(bars_close.len(), bars_open.len());
1030        assert_eq!(bars_close.len(), 2);
1031
1032        let bar_close = &bars_close[bar_index];
1033        let bar_open = &bars_open[bar_index];
1034
1035        // Bars should have the same OHLCV data
1036        assert_eq!(bar_close.open, bar_open.open);
1037        assert_eq!(bar_close.high, bar_open.high);
1038        assert_eq!(bar_close.low, bar_open.low);
1039        assert_eq!(bar_close.close, bar_open.close);
1040        assert_eq!(bar_close.volume, bar_open.volume);
1041
1042        // The close-timestamped bar should have later timestamp than open-timestamped bar
1043        // For 1-second bars, this should be exactly 1 second difference
1044        assert!(
1045            bar_close.ts_event > bar_open.ts_event,
1046            "Close-timestamped bar should have later timestamp than open-timestamped bar"
1047        );
1048
1049        // The difference should be exactly 1 second (1_000_000_000 nanoseconds) for 1s bars
1050        const ONE_SECOND_NS: u64 = 1_000_000_000;
1051        assert_eq!(
1052            bar_close.ts_event.as_u64() - bar_open.ts_event.as_u64(),
1053            ONE_SECOND_NS,
1054            "Timestamp difference should be exactly 1 second for 1s bars"
1055        );
1056    }
1057}