nautilus_databento/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    env, fs,
18    path::{Path, PathBuf},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use databento::dbn::{self, InstrumentDefMsg};
24use dbn::{
25    Publisher,
26    decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32    identifiers::{InstrumentId, Symbol, Venue},
33    instruments::InstrumentAny,
34    types::Currency,
35};
36
37use super::{
38    decode::{decode_imbalance_msg, decode_record, decode_statistics_msg, decode_status_msg},
39    symbology::decode_nautilus_instrument_id,
40    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
41};
42use crate::{decode::decode_instrument_def_msg, symbology::MetadataCache};
43
44/// A Nautilus data loader for Databento Binary Encoding (DBN) format data.
45///
46/// # Supported schemas:
47///  - `MBO` -> `OrderBookDelta`
48///  - `MBP_1` -> `(QuoteTick, Option<TradeTick>)`
49///  - `MBP_10` -> `OrderBookDepth10`
50///  - `BBO_1S` -> `QuoteTick`
51///  - `BBO_1M` -> `QuoteTick`
52///  - `CMBP_1` -> `(QuoteTick, Option<TradeTick>)`
53///  - `CBBO_1S` -> `QuoteTick`
54///  - `CBBO_1M` -> `QuoteTick`
55///  - `TCBBO` -> `(QuoteTick, TradeTick)`
56///  - `TBBO` -> `(QuoteTick, TradeTick)`
57///  - `TRADES` -> `TradeTick`
58///  - `OHLCV_1S` -> `Bar`
59///  - `OHLCV_1M` -> `Bar`
60///  - `OHLCV_1H` -> `Bar`
61///  - `OHLCV_1D` -> `Bar`
62///  - `OHLCV_EOD` -> `Bar`
63///  - `DEFINITION` -> `Instrument`
64///  - `IMBALANCE` -> `DatabentoImbalance`
65///  - `STATISTICS` -> `DatabentoStatistics`
66///  - `STATUS` -> `InstrumentStatus`
67///
68/// # References
69///
70/// <https://databento.com/docs/schemas-and-data-formats>
71#[cfg_attr(
72    feature = "python",
73    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
74)]
75#[derive(Debug)]
76pub struct DatabentoDataLoader {
77    publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
78    venue_dataset_map: IndexMap<Venue, Dataset>,
79    publisher_venue_map: IndexMap<PublisherId, Venue>,
80    symbol_venue_map: AHashMap<Symbol, Venue>,
81}
82
83impl DatabentoDataLoader {
84    /// Creates a new [`DatabentoDataLoader`] instance.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if locating or loading publishers data fails.
89    pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
90        let mut loader = Self {
91            publishers_map: IndexMap::new(),
92            venue_dataset_map: IndexMap::new(),
93            publisher_venue_map: IndexMap::new(),
94            symbol_venue_map: AHashMap::new(),
95        };
96
97        // Load publishers
98        let publishers_filepath = if let Some(p) = publishers_filepath {
99            p
100        } else {
101            // Use built-in publishers path
102            let mut exe_path = env::current_exe()?;
103            exe_path.pop();
104            exe_path.push("publishers.json");
105            exe_path
106        };
107
108        loader
109            .load_publishers(publishers_filepath)
110            .context("error loading publishers.json")?;
111
112        Ok(loader)
113    }
114
115    /// Load the publishers data from the file at the given `filepath`.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the file cannot be read or parsed as JSON.
120    pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
121        let file_content = fs::read_to_string(filepath)?;
122        let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
123
124        self.publishers_map = publishers
125            .clone()
126            .into_iter()
127            .map(|p| (p.publisher_id, p))
128            .collect();
129
130        let mut venue_dataset_map = IndexMap::new();
131
132        // Only insert a dataset if the venue key is not already in the map
133        for publisher in &publishers {
134            let venue = Venue::from(publisher.venue.as_str());
135            let dataset = Dataset::from(publisher.dataset.as_str());
136            venue_dataset_map.entry(venue).or_insert(dataset);
137        }
138
139        self.venue_dataset_map = venue_dataset_map;
140
141        // Insert CME Globex exchanges
142        let glbx = Dataset::from("GLBX.MDP3");
143        self.venue_dataset_map.insert(Venue::CBCM(), glbx);
144        self.venue_dataset_map.insert(Venue::GLBX(), glbx);
145        self.venue_dataset_map.insert(Venue::NYUM(), glbx);
146        self.venue_dataset_map.insert(Venue::XCBT(), glbx);
147        self.venue_dataset_map.insert(Venue::XCEC(), glbx);
148        self.venue_dataset_map.insert(Venue::XCME(), glbx);
149        self.venue_dataset_map.insert(Venue::XFXS(), glbx);
150        self.venue_dataset_map.insert(Venue::XNYM(), glbx);
151
152        self.publisher_venue_map = publishers
153            .into_iter()
154            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
155            .collect();
156
157        Ok(())
158    }
159
160    /// Returns the internal Databento publishers currently held by the loader.
161    #[must_use]
162    pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
163        &self.publishers_map
164    }
165
166    /// Sets the `venue` to map to the given `dataset`.
167    pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
168        _ = self.venue_dataset_map.insert(venue, dataset);
169    }
170
171    /// Returns the dataset which matches the given `venue` (if found).
172    #[must_use]
173    pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
174        self.venue_dataset_map.get(venue)
175    }
176
177    /// Returns the venue which matches the given `publisher_id` (if found).
178    #[must_use]
179    pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
180        self.publisher_venue_map.get(&publisher_id)
181    }
182
183    /// Returns the schema for the given `filepath`.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if the file cannot be decoded or metadata retrieval fails.
188    pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
189        let decoder = Decoder::from_zstd_file(filepath)?;
190        let metadata = decoder.metadata();
191        Ok(metadata.schema.map(|schema| schema.to_string()))
192    }
193
194    /// Reads instrument definition records from a DBN file.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if decoding the definition records fails.
199    pub fn read_definition_records(
200        &mut self,
201        filepath: &Path,
202        use_exchange_as_venue: bool,
203    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
204        let decoder = Decoder::from_zstd_file(filepath)?;
205        let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsg>();
206
207        Ok(std::iter::from_fn(move || {
208            let result: anyhow::Result<Option<InstrumentAny>> = (|| {
209                dbn_stream
210                    .advance()
211                    .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
212
213                if let Some(rec) = dbn_stream.get() {
214                    let record = dbn::RecordRef::from(rec);
215                    let msg = record
216                        .get::<InstrumentDefMsg>()
217                        .ok_or_else(|| anyhow::anyhow!("Failed to decode InstrumentDefMsg"))?;
218
219                    // Symbol and venue resolution
220                    let raw_symbol = rec
221                        .raw_symbol()
222                        .map_err(|e| anyhow::anyhow!("Error decoding `raw_symbol`: {e}"))?;
223                    let symbol = Symbol::from(raw_symbol);
224
225                    let publisher = rec
226                        .hd
227                        .publisher()
228                        .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
229                    let venue = match publisher {
230                        Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
231                            let exchange = rec.exchange().map_err(|e| {
232                                anyhow::anyhow!("Missing `exchange` for record: {e}")
233                            })?;
234                            let venue = Venue::from_code(exchange).map_err(|e| {
235                                anyhow::anyhow!("Venue not found for exchange {exchange}: {e}")
236                            })?;
237                            self.symbol_venue_map.insert(symbol, venue);
238                            venue
239                        }
240                        _ => *self
241                            .publisher_venue_map
242                            .get(&msg.hd.publisher_id)
243                            .ok_or_else(|| {
244                                anyhow::anyhow!(
245                                    "Venue not found for publisher_id {}",
246                                    msg.hd.publisher_id
247                                )
248                            })?,
249                    };
250                    let instrument_id = InstrumentId::new(symbol, venue);
251                    let ts_init = msg.ts_recv.into();
252
253                    let data = decode_instrument_def_msg(rec, instrument_id, Some(ts_init))?;
254                    Ok(Some(data))
255                } else {
256                    // No more records
257                    Ok(None)
258                }
259            })();
260
261            match result {
262                Ok(Some(item)) => Some(Ok(item)),
263                Ok(None) => None,
264                Err(e) => Some(Err(e)),
265            }
266        }))
267    }
268
269    /// Reads and decodes market data records from a DBN file.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if reading records fails.
274    pub fn read_records<T>(
275        &self,
276        filepath: &Path,
277        instrument_id: Option<InstrumentId>,
278        price_precision: Option<u8>,
279        include_trades: bool,
280        bars_timestamp_on_close: Option<bool>,
281    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
282    where
283        T: dbn::Record + dbn::HasRType + 'static,
284    {
285        let decoder = Decoder::from_zstd_file(filepath)?;
286        let metadata = decoder.metadata().clone();
287        let mut metadata_cache = MetadataCache::new(metadata);
288        let mut dbn_stream = decoder.decode_stream::<T>();
289
290        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
291
292        Ok(std::iter::from_fn(move || {
293            let result: anyhow::Result<Option<(Option<Data>, Option<Data>)>> = (|| {
294                dbn_stream
295                    .advance()
296                    .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
297                if let Some(rec) = dbn_stream.get() {
298                    let record = dbn::RecordRef::from(rec);
299                    let instrument_id = if let Some(id) = &instrument_id {
300                        *id
301                    } else {
302                        decode_nautilus_instrument_id(
303                            &record,
304                            &mut metadata_cache,
305                            &self.publisher_venue_map,
306                            &self.symbol_venue_map,
307                        )
308                        .context("failed to decode instrument id")?
309                    };
310                    let (item1, item2) = decode_record(
311                        &record,
312                        instrument_id,
313                        price_precision,
314                        None,
315                        include_trades,
316                        bars_timestamp_on_close.unwrap_or(true),
317                    )?;
318                    Ok(Some((item1, item2)))
319                } else {
320                    Ok(None)
321                }
322            })();
323            match result {
324                Ok(Some(v)) => Some(Ok(v)),
325                Ok(None) => None,
326                Err(e) => Some(Err(e)),
327            }
328        }))
329    }
330
331    /// Loads all instrument definitions from a DBN file.
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if loading instruments fails.
336    pub fn load_instruments(
337        &mut self,
338        filepath: &Path,
339        use_exchange_as_venue: bool,
340    ) -> anyhow::Result<Vec<InstrumentAny>> {
341        self.read_definition_records(filepath, use_exchange_as_venue)?
342            .collect::<Result<Vec<_>, _>>()
343    }
344
345    /// Loads order book delta messages from a DBN MBO schema file.
346    ///
347    /// Cannot include trades.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if loading order book deltas fails.
352    pub fn load_order_book_deltas(
353        &self,
354        filepath: &Path,
355        instrument_id: Option<InstrumentId>,
356        price_precision: Option<u8>,
357    ) -> anyhow::Result<Vec<OrderBookDelta>> {
358        self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false, None)?
359            .filter_map(|result| match result {
360                Ok((Some(item1), _)) => {
361                    if let Data::Delta(delta) = item1 {
362                        Some(Ok(delta))
363                    } else {
364                        None
365                    }
366                }
367                Ok((None, _)) => None,
368                Err(e) => Some(Err(e)),
369            })
370            .collect()
371    }
372
373    /// Loads order book depth10 snapshots from a DBN MBP-10 schema file.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if loading order book depth10 fails.
378    pub fn load_order_book_depth10(
379        &self,
380        filepath: &Path,
381        instrument_id: Option<InstrumentId>,
382        price_precision: Option<u8>,
383    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
384        self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false, None)?
385            .filter_map(|result| match result {
386                Ok((Some(item1), _)) => {
387                    if let Data::Depth10(depth) = item1 {
388                        Some(Ok(*depth))
389                    } else {
390                        None
391                    }
392                }
393                Ok((None, _)) => None,
394                Err(e) => Some(Err(e)),
395            })
396            .collect()
397    }
398
399    /// Loads quote tick messages from a DBN MBP-1 or TBBO schema file.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if loading quotes fails.
404    pub fn load_quotes(
405        &self,
406        filepath: &Path,
407        instrument_id: Option<InstrumentId>,
408        price_precision: Option<u8>,
409    ) -> anyhow::Result<Vec<QuoteTick>> {
410        self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false, None)?
411            .filter_map(|result| match result {
412                Ok((Some(item1), _)) => {
413                    if let Data::Quote(quote) = item1 {
414                        Some(Ok(quote))
415                    } else {
416                        None
417                    }
418                }
419                Ok((None, _)) => None,
420                Err(e) => Some(Err(e)),
421            })
422            .collect()
423    }
424
425    /// Loads best bid/offer quote messages from a DBN BBO schema file.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if loading BBO quotes fails.
430    pub fn load_bbo_quotes(
431        &self,
432        filepath: &Path,
433        instrument_id: Option<InstrumentId>,
434        price_precision: Option<u8>,
435    ) -> anyhow::Result<Vec<QuoteTick>> {
436        self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false, None)?
437            .filter_map(|result| match result {
438                Ok((Some(item1), _)) => {
439                    if let Data::Quote(quote) = item1 {
440                        Some(Ok(quote))
441                    } else {
442                        None
443                    }
444                }
445                Ok((None, _)) => None,
446                Err(e) => Some(Err(e)),
447            })
448            .collect()
449    }
450
451    /// Loads consolidated MBP-1 quote messages from a DBN CMBP-1 schema file.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if loading consolidated MBP-1 quotes fails.
456    pub fn load_cmbp_quotes(
457        &self,
458        filepath: &Path,
459        instrument_id: Option<InstrumentId>,
460        price_precision: Option<u8>,
461    ) -> anyhow::Result<Vec<QuoteTick>> {
462        self.read_records::<dbn::Cmbp1Msg>(filepath, instrument_id, price_precision, false, None)?
463            .filter_map(|result| match result {
464                Ok((Some(item1), _)) => {
465                    if let Data::Quote(quote) = item1 {
466                        Some(Ok(quote))
467                    } else {
468                        None
469                    }
470                }
471                Ok((None, _)) => None,
472                Err(e) => Some(Err(e)),
473            })
474            .collect()
475    }
476
477    /// Loads consolidated best bid/offer quote messages from a DBN CBBO schema file.
478    ///
479    /// # Errors
480    ///
481    /// Returns an error if loading consolidated BBO quotes fails.
482    pub fn load_cbbo_quotes(
483        &self,
484        filepath: &Path,
485        instrument_id: Option<InstrumentId>,
486        price_precision: Option<u8>,
487    ) -> anyhow::Result<Vec<QuoteTick>> {
488        self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
489            .filter_map(|result| match result {
490                Ok((Some(item1), _)) => {
491                    if let Data::Quote(quote) = item1 {
492                        Some(Ok(quote))
493                    } else {
494                        None
495                    }
496                }
497                Ok((None, _)) => None,
498                Err(e) => Some(Err(e)),
499            })
500            .collect()
501    }
502
503    /// Loads trade messages from a DBN TBBO schema file.
504    ///
505    /// # Errors
506    ///
507    /// Returns an error if loading TBBO trades fails.
508    pub fn load_tbbo_trades(
509        &self,
510        filepath: &Path,
511        instrument_id: Option<InstrumentId>,
512        price_precision: Option<u8>,
513    ) -> anyhow::Result<Vec<TradeTick>> {
514        self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false, None)?
515            .filter_map(|result| match result {
516                Ok((_, maybe_item2)) => {
517                    if let Some(Data::Trade(trade)) = maybe_item2 {
518                        Some(Ok(trade))
519                    } else {
520                        None
521                    }
522                }
523                Err(e) => Some(Err(e)),
524            })
525            .collect()
526    }
527
528    /// Loads trade messages from a DBN TCBBO schema file.
529    ///
530    /// # Errors
531    ///
532    /// Returns an error if loading TCBBO trades fails.
533    pub fn load_tcbbo_trades(
534        &self,
535        filepath: &Path,
536        instrument_id: Option<InstrumentId>,
537        price_precision: Option<u8>,
538    ) -> anyhow::Result<Vec<TradeTick>> {
539        self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
540            .filter_map(|result| match result {
541                Ok((_, maybe_item2)) => {
542                    if let Some(Data::Trade(trade)) = maybe_item2 {
543                        Some(Ok(trade))
544                    } else {
545                        None
546                    }
547                }
548                Err(e) => Some(Err(e)),
549            })
550            .collect()
551    }
552
553    /// Loads trade messages from a DBN TRADES schema file.
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if loading trades fails.
558    pub fn load_trades(
559        &self,
560        filepath: &Path,
561        instrument_id: Option<InstrumentId>,
562        price_precision: Option<u8>,
563    ) -> anyhow::Result<Vec<TradeTick>> {
564        self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false, None)?
565            .filter_map(|result| match result {
566                Ok((Some(item1), _)) => {
567                    if let Data::Trade(trade) = item1 {
568                        Some(Ok(trade))
569                    } else {
570                        None
571                    }
572                }
573                Ok((None, _)) => None,
574                Err(e) => Some(Err(e)),
575            })
576            .collect()
577    }
578
579    /// Loads OHLCV bar messages from a DBN OHLCV schema file.
580    ///
581    /// # Errors
582    ///
583    /// Returns an error if loading bars fails.
584    pub fn load_bars(
585        &self,
586        filepath: &Path,
587        instrument_id: Option<InstrumentId>,
588        price_precision: Option<u8>,
589        timestamp_on_close: Option<bool>,
590    ) -> anyhow::Result<Vec<Bar>> {
591        self.read_records::<dbn::OhlcvMsg>(
592            filepath,
593            instrument_id,
594            price_precision,
595            false,
596            timestamp_on_close,
597        )?
598        .filter_map(|result| match result {
599            Ok((Some(item1), _)) => {
600                if let Data::Bar(bar) = item1 {
601                    Some(Ok(bar))
602                } else {
603                    None
604                }
605            }
606            Ok((None, _)) => None,
607            Err(e) => Some(Err(e)),
608        })
609        .collect()
610    }
611
612    /// Loads instrument status messages from a DBN STATUS schema file.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if loading status records fails.
617    pub fn load_status_records<T>(
618        &self,
619        filepath: &Path,
620        instrument_id: Option<InstrumentId>,
621    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
622    where
623        T: dbn::Record + dbn::HasRType + 'static,
624    {
625        let decoder = Decoder::from_zstd_file(filepath)?;
626        let metadata = decoder.metadata().clone();
627        let mut metadata_cache = MetadataCache::new(metadata);
628        let mut dbn_stream = decoder.decode_stream::<T>();
629
630        Ok(std::iter::from_fn(move || {
631            if let Err(e) = dbn_stream.advance() {
632                return Some(Err(e.into()));
633            }
634            match dbn_stream.get() {
635                Some(rec) => {
636                    let record = dbn::RecordRef::from(rec);
637                    let instrument_id = match &instrument_id {
638                        Some(id) => *id, // Copy
639                        None => match decode_nautilus_instrument_id(
640                            &record,
641                            &mut metadata_cache,
642                            &self.publisher_venue_map,
643                            &self.symbol_venue_map,
644                        ) {
645                            Ok(id) => id,
646                            Err(e) => return Some(Err(e)),
647                        },
648                    };
649
650                    let msg = match record.get::<dbn::StatusMsg>() {
651                        Some(m) => m,
652                        None => return Some(Err(anyhow::anyhow!("Invalid `StatusMsg`"))),
653                    };
654                    let ts_init = msg.ts_recv.into();
655
656                    match decode_status_msg(msg, instrument_id, Some(ts_init)) {
657                        Ok(data) => Some(Ok(data)),
658                        Err(e) => Some(Err(e)),
659                    }
660                }
661                None => None,
662            }
663        }))
664    }
665
666    /// Reads imbalance messages from a DBN IMBALANCE schema file.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if reading imbalance records fails.
671    pub fn read_imbalance_records<T>(
672        &self,
673        filepath: &Path,
674        instrument_id: Option<InstrumentId>,
675        price_precision: Option<u8>,
676    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
677    where
678        T: dbn::Record + dbn::HasRType + 'static,
679    {
680        let decoder = Decoder::from_zstd_file(filepath)?;
681        let metadata = decoder.metadata().clone();
682        let mut metadata_cache = MetadataCache::new(metadata);
683        let mut dbn_stream = decoder.decode_stream::<T>();
684
685        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
686
687        Ok(std::iter::from_fn(move || {
688            if let Err(e) = dbn_stream.advance() {
689                return Some(Err(e.into()));
690            }
691            match dbn_stream.get() {
692                Some(rec) => {
693                    let record = dbn::RecordRef::from(rec);
694                    let instrument_id = match &instrument_id {
695                        Some(id) => *id, // Copy
696                        None => match decode_nautilus_instrument_id(
697                            &record,
698                            &mut metadata_cache,
699                            &self.publisher_venue_map,
700                            &self.symbol_venue_map,
701                        ) {
702                            Ok(id) => id,
703                            Err(e) => return Some(Err(e)),
704                        },
705                    };
706
707                    let msg = match record.get::<dbn::ImbalanceMsg>() {
708                        Some(m) => m,
709                        None => return Some(Err(anyhow::anyhow!("Invalid `ImbalanceMsg`"))),
710                    };
711                    let ts_init = msg.ts_recv.into();
712
713                    match decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init)) {
714                        Ok(data) => Some(Ok(data)),
715                        Err(e) => Some(Err(e)),
716                    }
717                }
718                None => None,
719            }
720        }))
721    }
722
723    /// Reads statistics messages from a DBN STATISTICS schema file.
724    ///
725    /// # Errors
726    ///
727    /// Returns an error if reading statistics records fails.
728    pub fn read_statistics_records<T>(
729        &self,
730        filepath: &Path,
731        instrument_id: Option<InstrumentId>,
732        price_precision: Option<u8>,
733    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
734    where
735        T: dbn::Record + dbn::HasRType + 'static,
736    {
737        let decoder = Decoder::from_zstd_file(filepath)?;
738        let metadata = decoder.metadata().clone();
739        let mut metadata_cache = MetadataCache::new(metadata);
740        let mut dbn_stream = decoder.decode_stream::<T>();
741
742        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
743
744        Ok(std::iter::from_fn(move || {
745            if let Err(e) = dbn_stream.advance() {
746                return Some(Err(e.into()));
747            }
748            match dbn_stream.get() {
749                Some(rec) => {
750                    let record = dbn::RecordRef::from(rec);
751                    let instrument_id = match &instrument_id {
752                        Some(id) => *id, // Copy
753                        None => match decode_nautilus_instrument_id(
754                            &record,
755                            &mut metadata_cache,
756                            &self.publisher_venue_map,
757                            &self.symbol_venue_map,
758                        ) {
759                            Ok(id) => id,
760                            Err(e) => return Some(Err(e)),
761                        },
762                    };
763                    let msg = match record.get::<dbn::StatMsg>() {
764                        Some(m) => m,
765                        None => return Some(Err(anyhow::anyhow!("Invalid `StatMsg`"))),
766                    };
767                    let ts_init = msg.ts_recv.into();
768
769                    match decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
770                    {
771                        Ok(data) => Some(Ok(data)),
772                        Err(e) => Some(Err(e)),
773                    }
774                }
775                None => None,
776            }
777        }))
778    }
779}
780
781#[cfg(test)]
782mod tests {
783    use std::path::{Path, PathBuf};
784
785    use nautilus_model::types::{Price, Quantity};
786    use rstest::{fixture, rstest};
787    use ustr::Ustr;
788
789    use super::*;
790
791    fn test_data_path() -> PathBuf {
792        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
793    }
794
795    #[fixture]
796    fn loader() -> DatabentoDataLoader {
797        let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
798        DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
799    }
800
801    // TODO: Improve the below assertions that we've actually read the records we expected
802
803    #[rstest]
804    fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
805        let dataset = Ustr::from("EQUS.PLUS");
806        let venue = Venue::from("XNAS");
807        loader.set_dataset_for_venue(dataset, venue);
808
809        let result = loader.get_dataset_for_venue(&venue).unwrap();
810        assert_eq!(*result, dataset);
811    }
812
813    #[rstest]
814    #[case(test_data_path().join("test_data.definition.dbn.zst"))]
815    fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
816        let instruments = loader.load_instruments(&path, false).unwrap();
817
818        assert_eq!(instruments.len(), 2);
819    }
820
821    #[rstest]
822    fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
823        let path = test_data_path().join("test_data.mbo.dbn.zst");
824        let instrument_id = InstrumentId::from("ESM4.GLBX");
825
826        let deltas = loader
827            .load_order_book_deltas(&path, Some(instrument_id), None)
828            .unwrap();
829
830        assert_eq!(deltas.len(), 2);
831    }
832
833    #[rstest]
834    fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
835        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
836        let instrument_id = InstrumentId::from("ESM4.GLBX");
837
838        let depths = loader
839            .load_order_book_depth10(&path, Some(instrument_id), None)
840            .unwrap();
841
842        assert_eq!(depths.len(), 2);
843    }
844
845    #[rstest]
846    fn test_load_quotes(loader: DatabentoDataLoader) {
847        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
848        let instrument_id = InstrumentId::from("ESM4.GLBX");
849
850        let quotes = loader
851            .load_quotes(&path, Some(instrument_id), None)
852            .unwrap();
853
854        assert_eq!(quotes.len(), 2);
855    }
856
857    #[rstest]
858    #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
859    #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
860    fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
861        let instrument_id = InstrumentId::from("ESM4.GLBX");
862
863        let quotes = loader
864            .load_bbo_quotes(&path, Some(instrument_id), None)
865            .unwrap();
866
867        assert_eq!(quotes.len(), 4);
868    }
869
870    #[rstest]
871    fn test_load_cmbp_quotes(loader: DatabentoDataLoader) {
872        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
873        let instrument_id = InstrumentId::from("ESM4.GLBX");
874
875        let quotes = loader
876            .load_cmbp_quotes(&path, Some(instrument_id), None)
877            .unwrap();
878
879        // Verify exact data count
880        assert_eq!(quotes.len(), 2);
881
882        // Verify first quote fields
883        let first_quote = &quotes[0];
884        assert_eq!(first_quote.instrument_id, instrument_id);
885        assert_eq!(first_quote.bid_price, Price::from("3720.25"));
886        assert_eq!(first_quote.ask_price, Price::from("3720.50"));
887        assert_eq!(first_quote.bid_size, Quantity::from(24));
888        assert_eq!(first_quote.ask_size, Quantity::from(11));
889        assert_eq!(first_quote.ts_event, 1609160400006136329);
890        assert_eq!(first_quote.ts_init, 1609160400006136329);
891    }
892
893    #[rstest]
894    fn test_load_cbbo_quotes(loader: DatabentoDataLoader) {
895        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
896        let instrument_id = InstrumentId::from("ESM4.GLBX");
897
898        let quotes = loader
899            .load_cbbo_quotes(&path, Some(instrument_id), None)
900            .unwrap();
901
902        // Verify exact data count
903        assert_eq!(quotes.len(), 2);
904
905        // Verify first quote fields
906        let first_quote = &quotes[0];
907        assert_eq!(first_quote.instrument_id, instrument_id);
908        assert_eq!(first_quote.bid_price, Price::from("3720.25"));
909        assert_eq!(first_quote.ask_price, Price::from("3720.50"));
910        assert_eq!(first_quote.bid_size, Quantity::from(24));
911        assert_eq!(first_quote.ask_size, Quantity::from(11));
912        assert_eq!(first_quote.ts_event, 1609160400006136329);
913        assert_eq!(first_quote.ts_init, 1609160400006136329);
914    }
915
916    #[rstest]
917    fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
918        let path = test_data_path().join("test_data.tbbo.dbn.zst");
919        let instrument_id = InstrumentId::from("ESM4.GLBX");
920
921        let trades = loader
922            .load_tbbo_trades(&path, Some(instrument_id), None)
923            .unwrap();
924
925        // TBBO test data doesn't contain valid trade data (size/price may be 0)
926        assert_eq!(trades.len(), 0);
927    }
928
929    #[rstest]
930    fn test_load_tcbbo_trades(loader: DatabentoDataLoader) {
931        // Since we don't have dedicated TCBBO test data, we'll use CBBO data
932        // In practice, TCBBO would be CBBO messages with trade data
933        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
934        let instrument_id = InstrumentId::from("ESM4.GLBX");
935
936        let result = loader.load_tcbbo_trades(&path, Some(instrument_id), None);
937
938        assert!(result.is_ok());
939        let trades = result.unwrap();
940        assert_eq!(trades.len(), 2);
941    }
942
943    #[rstest]
944    fn test_load_trades(loader: DatabentoDataLoader) {
945        let path = test_data_path().join("test_data.trades.dbn.zst");
946        let instrument_id = InstrumentId::from("ESM4.GLBX");
947        let trades = loader
948            .load_trades(&path, Some(instrument_id), None)
949            .unwrap();
950
951        assert_eq!(trades.len(), 2);
952    }
953
954    #[rstest]
955    // #[case(test_data_path().join("test_data.ohlcv-1d.dbn.zst"))]  // TODO: Empty file (0 records)
956    #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
957    #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
958    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
959    fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
960        let instrument_id = InstrumentId::from("ESM4.GLBX");
961        let bars = loader
962            .load_bars(&path, Some(instrument_id), None, None)
963            .unwrap();
964
965        assert_eq!(bars.len(), 2);
966    }
967
968    #[rstest]
969    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
970    fn test_load_bars_timestamp_on_close_true(loader: DatabentoDataLoader, #[case] path: PathBuf) {
971        let instrument_id = InstrumentId::from("ESM4.GLBX");
972        let bars = loader
973            .load_bars(&path, Some(instrument_id), None, Some(true))
974            .unwrap();
975
976        assert_eq!(bars.len(), 2);
977
978        // When bars_timestamp_on_close is true, both ts_event and ts_init should be close time
979        for bar in &bars {
980            assert_eq!(
981                bar.ts_event, bar.ts_init,
982                "ts_event and ts_init should both be close time when bars_timestamp_on_close=true"
983            );
984        }
985    }
986
987    #[rstest]
988    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
989    fn test_load_bars_timestamp_on_close_false(loader: DatabentoDataLoader, #[case] path: PathBuf) {
990        let instrument_id = InstrumentId::from("ESM4.GLBX");
991        let bars = loader
992            .load_bars(&path, Some(instrument_id), None, Some(false))
993            .unwrap();
994
995        assert_eq!(bars.len(), 2);
996
997        // When bars_timestamp_on_close is false, ts_event is open time and ts_init is close time
998        for bar in &bars {
999            assert_ne!(
1000                bar.ts_event, bar.ts_init,
1001                "ts_event should be open time and ts_init should be close time when bars_timestamp_on_close=false"
1002            );
1003            // For 1-second bars, ts_init (close) should be 1 second after ts_event (open)
1004            assert_eq!(bar.ts_init.as_u64(), bar.ts_event.as_u64() + 1_000_000_000);
1005        }
1006    }
1007
1008    #[rstest]
1009    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 0)]
1010    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 1)]
1011    fn test_load_bars_timestamp_comparison(
1012        loader: DatabentoDataLoader,
1013        #[case] path: PathBuf,
1014        #[case] bar_index: usize,
1015    ) {
1016        let instrument_id = InstrumentId::from("ESM4.GLBX");
1017
1018        let bars_close = loader
1019            .load_bars(&path, Some(instrument_id), None, Some(true))
1020            .unwrap();
1021
1022        let bars_open = loader
1023            .load_bars(&path, Some(instrument_id), None, Some(false))
1024            .unwrap();
1025
1026        assert_eq!(bars_close.len(), bars_open.len());
1027        assert_eq!(bars_close.len(), 2);
1028
1029        let bar_close = &bars_close[bar_index];
1030        let bar_open = &bars_open[bar_index];
1031
1032        // Bars should have the same OHLCV data
1033        assert_eq!(bar_close.open, bar_open.open);
1034        assert_eq!(bar_close.high, bar_open.high);
1035        assert_eq!(bar_close.low, bar_open.low);
1036        assert_eq!(bar_close.close, bar_open.close);
1037        assert_eq!(bar_close.volume, bar_open.volume);
1038
1039        // The close-timestamped bar should have later timestamp than open-timestamped bar
1040        // For 1-second bars, this should be exactly 1 second difference
1041        assert!(
1042            bar_close.ts_event > bar_open.ts_event,
1043            "Close-timestamped bar should have later timestamp than open-timestamped bar"
1044        );
1045
1046        // The difference should be exactly 1 second (1_000_000_000 nanoseconds) for 1s bars
1047        const ONE_SECOND_NS: u64 = 1_000_000_000;
1048        assert_eq!(
1049            bar_close.ts_event.as_u64() - bar_open.ts_event.as_u64(),
1050            ONE_SECOND_NS,
1051            "Timestamp difference should be exactly 1 second for 1s bars"
1052        );
1053    }
1054
1055    #[rstest]
1056    fn test_load_status_records(loader: DatabentoDataLoader) {
1057        let path = test_data_path().join("test_data.status.dbn.zst");
1058        let instrument_id = InstrumentId::from("ESM4.GLBX");
1059
1060        let statuses = loader
1061            .load_status_records::<dbn::StatusMsg>(&path, Some(instrument_id))
1062            .unwrap()
1063            .collect::<anyhow::Result<Vec<_>>>()
1064            .unwrap();
1065
1066        // Assert total count matches Python test expectations
1067        assert_eq!(statuses.len(), 4, "Should load exactly 4 status records");
1068
1069        // Assert first record fields match Python test expectations
1070        let first = &statuses[0];
1071        assert_eq!(first.instrument_id, instrument_id);
1072        assert_eq!(first.ts_event.as_u64(), 1609110000000000000);
1073        assert_eq!(first.ts_init.as_u64(), 1609113600000000000);
1074    }
1075
1076    #[rstest]
1077    fn test_read_imbalance_records(loader: DatabentoDataLoader) {
1078        let path = test_data_path().join("test_data.imbalance.dbn.zst");
1079        let instrument_id = InstrumentId::from("ESM4.GLBX");
1080
1081        let imbalances = loader
1082            .read_imbalance_records::<dbn::ImbalanceMsg>(&path, Some(instrument_id), None)
1083            .unwrap()
1084            .collect::<anyhow::Result<Vec<_>>>()
1085            .unwrap();
1086
1087        // Assert total count
1088        assert_eq!(
1089            imbalances.len(),
1090            2,
1091            "Should load exactly 2 imbalance records"
1092        );
1093
1094        // Assert first record has required fields
1095        let first = &imbalances[0];
1096        assert_eq!(first.instrument_id, instrument_id);
1097        assert!(
1098            first.ref_price.as_f64() > 0.0,
1099            "ref_price should be positive"
1100        );
1101        assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1102        assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1103        assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1104    }
1105
1106    #[rstest]
1107    fn test_read_statistics_records(loader: DatabentoDataLoader) {
1108        let path = test_data_path().join("test_data.statistics.dbn.zst");
1109        let instrument_id = InstrumentId::from("ESM4.GLBX");
1110
1111        let statistics = loader
1112            .read_statistics_records::<dbn::StatMsg>(&path, Some(instrument_id), None)
1113            .unwrap()
1114            .collect::<anyhow::Result<Vec<_>>>()
1115            .unwrap();
1116
1117        // Assert total count
1118        assert_eq!(
1119            statistics.len(),
1120            2,
1121            "Should load exactly 2 statistics records"
1122        );
1123
1124        // Assert first record has required fields
1125        let first = &statistics[0];
1126        assert_eq!(first.instrument_id, instrument_id);
1127        assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1128        assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1129        assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1130        assert!(first.sequence > 0, "sequence should be positive");
1131    }
1132}