nautilus_databento/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    env, fs,
19    path::{Path, PathBuf},
20};
21
22use databento::dbn;
23use dbn::{
24    Publisher,
25    compat::InstrumentDefMsgV1,
26    decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32    identifiers::{InstrumentId, Symbol, Venue},
33    instruments::InstrumentAny,
34    types::Currency,
35};
36
37use super::{
38    decode::{
39        decode_imbalance_msg, decode_instrument_def_msg_v1, decode_record, decode_statistics_msg,
40        decode_status_msg,
41    },
42    symbology::decode_nautilus_instrument_id,
43    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
44};
45use crate::symbology::MetadataCache;
46
47/// A Nautilus data loader for Databento Binary Encoding (DBN) format data.
48///
49/// # Supported schemas:
50///  - MBO -> `OrderBookDelta`
51///  - MBP_1 -> `(QuoteTick, Option<TradeTick>)`
52///  - MBP_10 -> `OrderBookDepth10`
53///  - BBO_1S -> `QuoteTick`
54///  - BBO_1M -> `QuoteTick`
55///  - TBBO -> `(QuoteTick, TradeTick)`
56///  - TRADES -> `TradeTick`
57///  - OHLCV_1S -> `Bar`
58///  - OHLCV_1M -> `Bar`
59///  - OHLCV_1H -> `Bar`
60///  - OHLCV_1D -> `Bar`
61///  - DEFINITION -> `Instrument`
62///  - IMBALANCE -> `DatabentoImbalance`
63///  - STATISTICS -> `DatabentoStatistics`
64///  - STATUS -> `InstrumentStatus`
65///
66/// # References
67///
68/// <https://databento.com/docs/schemas-and-data-formats>
69#[cfg_attr(
70    feature = "python",
71    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
72)]
73#[derive(Debug)]
74pub struct DatabentoDataLoader {
75    publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
76    venue_dataset_map: IndexMap<Venue, Dataset>,
77    publisher_venue_map: IndexMap<PublisherId, Venue>,
78    symbol_venue_map: HashMap<Symbol, Venue>,
79}
80
81impl DatabentoDataLoader {
82    /// Creates a new [`DatabentoDataLoader`] instance.
83    pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
84        let mut loader = Self {
85            publishers_map: IndexMap::new(),
86            venue_dataset_map: IndexMap::new(),
87            publisher_venue_map: IndexMap::new(),
88            symbol_venue_map: HashMap::new(),
89        };
90
91        // Load publishers
92        let publishers_filepath = if let Some(p) = publishers_filepath {
93            p
94        } else {
95            // Use built-in publishers path
96            let mut exe_path = env::current_exe()?;
97            exe_path.pop();
98            exe_path.push("publishers.json");
99            exe_path
100        };
101
102        loader
103            .load_publishers(publishers_filepath)
104            .unwrap_or_else(|e| panic!("Error loading publishers.json: {e}"));
105
106        Ok(loader)
107    }
108
109    /// Load the publishers data from the file at the given `filepath`.
110    pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
111        let file_content = fs::read_to_string(filepath)?;
112        let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
113
114        self.publishers_map = publishers
115            .clone()
116            .into_iter()
117            .map(|p| (p.publisher_id, p))
118            .collect();
119
120        let mut venue_dataset_map = IndexMap::new();
121
122        // Only insert a dataset if the venue key is not already in the map
123        for publisher in &publishers {
124            let venue = Venue::from(publisher.venue.as_str());
125            let dataset = Dataset::from(publisher.dataset.as_str());
126            venue_dataset_map.entry(venue).or_insert(dataset);
127        }
128
129        self.venue_dataset_map = venue_dataset_map;
130
131        // Insert CME Globex exchanges
132        let glbx = Dataset::from("GLBX.MDP3");
133        self.venue_dataset_map.insert(Venue::CBCM(), glbx);
134        self.venue_dataset_map.insert(Venue::GLBX(), glbx);
135        self.venue_dataset_map.insert(Venue::NYUM(), glbx);
136        self.venue_dataset_map.insert(Venue::XCBT(), glbx);
137        self.venue_dataset_map.insert(Venue::XCEC(), glbx);
138        self.venue_dataset_map.insert(Venue::XCME(), glbx);
139        self.venue_dataset_map.insert(Venue::XFXS(), glbx);
140        self.venue_dataset_map.insert(Venue::XNYM(), glbx);
141
142        self.publisher_venue_map = publishers
143            .into_iter()
144            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
145            .collect();
146
147        Ok(())
148    }
149
150    /// Returns the internal Databento publishers currently held by the loader.
151    #[must_use]
152    pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
153        &self.publishers_map
154    }
155
156    /// Sets the `venue` to map to the given `dataset`.
157    pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
158        _ = self.venue_dataset_map.insert(venue, dataset);
159    }
160
161    /// Returns the dataset which matches the given `venue` (if found).
162    #[must_use]
163    pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
164        self.venue_dataset_map.get(venue)
165    }
166
167    /// Returns the venue which matches the given `publisher_id` (if found).
168    #[must_use]
169    pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
170        self.publisher_venue_map.get(&publisher_id)
171    }
172
173    /// Returns the schema for the given `filepath`.
174    pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
175        let decoder = Decoder::from_zstd_file(filepath)?;
176        let metadata = decoder.metadata();
177        Ok(metadata.schema.map(|schema| schema.to_string()))
178    }
179
180    pub fn read_definition_records(
181        &mut self,
182        filepath: &Path,
183        use_exchange_as_venue: bool,
184    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
185        let mut decoder = Decoder::from_zstd_file(filepath)?;
186
187        // Setting the policy to decode v1 data in its original format,
188        // rather than upgrading to v2 for now (decoding tests fail on `UpgradeToV2`).
189        let upgrade_policy = dbn::VersionUpgradePolicy::AsIs;
190        decoder.set_upgrade_policy(upgrade_policy);
191
192        let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsgV1>();
193
194        Ok(std::iter::from_fn(move || {
195            if let Err(e) = dbn_stream.advance() {
196                return Some(Err(e.into()));
197            }
198            match dbn_stream.get() {
199                Some(rec) => {
200                    let record = dbn::RecordRef::from(rec);
201                    let msg = record.get::<InstrumentDefMsgV1>().unwrap();
202
203                    let raw_symbol = rec.raw_symbol().expect("Error decoding `raw_symbol`");
204                    let symbol = Symbol::from(raw_symbol);
205
206                    let publisher = rec.hd.publisher().expect("Invalid `publisher` for record");
207                    let venue = match publisher {
208                        Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
209                            // SAFETY: GLBX instruments have a valid `exchange` field
210                            let exchange = rec.exchange().unwrap();
211                            let venue = Venue::from_code(exchange).unwrap_or_else(|_| {
212                                panic!("`Venue` not found for exchange {exchange}")
213                            });
214                            self.symbol_venue_map.insert(symbol, venue);
215                            venue
216                        }
217                        _ => *self
218                            .publisher_venue_map
219                            .get(&msg.hd.publisher_id)
220                            .expect("`Venue` not found `publisher_id`"),
221                    };
222                    let instrument_id = InstrumentId::new(symbol, venue);
223
224                    match decode_instrument_def_msg_v1(rec, instrument_id, msg.ts_recv.into()) {
225                        Ok(data) => Some(Ok(data)),
226                        Err(e) => Some(Err(e)),
227                    }
228                }
229                None => None,
230            }
231        }))
232    }
233
234    pub fn read_records<T>(
235        &self,
236        filepath: &Path,
237        instrument_id: Option<InstrumentId>,
238        price_precision: Option<u8>,
239        include_trades: bool,
240    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
241    where
242        T: dbn::Record + dbn::HasRType + 'static,
243    {
244        let decoder = Decoder::from_zstd_file(filepath)?;
245        let metadata = decoder.metadata().clone();
246        let mut metadata_cache = MetadataCache::new(metadata);
247        let mut dbn_stream = decoder.decode_stream::<T>();
248
249        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
250
251        Ok(std::iter::from_fn(move || {
252            if let Err(e) = dbn_stream.advance() {
253                return Some(Err(e.into()));
254            }
255            match dbn_stream.get() {
256                Some(rec) => {
257                    let record = dbn::RecordRef::from(rec);
258                    let instrument_id = match &instrument_id {
259                        Some(id) => *id, // Copy
260                        None => decode_nautilus_instrument_id(
261                            &record,
262                            &mut metadata_cache,
263                            &self.publisher_venue_map,
264                            &self.symbol_venue_map,
265                        )
266                        .expect("Failed to decode record"),
267                    };
268
269                    match decode_record(
270                        &record,
271                        instrument_id,
272                        price_precision,
273                        None,
274                        include_trades,
275                    ) {
276                        Ok(data) => Some(Ok(data)),
277                        Err(e) => Some(Err(e)),
278                    }
279                }
280                None => None,
281            }
282        }))
283    }
284
285    pub fn load_instruments(
286        &mut self,
287        filepath: &Path,
288        use_exchange_as_venue: bool,
289    ) -> anyhow::Result<Vec<InstrumentAny>> {
290        self.read_definition_records(filepath, use_exchange_as_venue)?
291            .collect::<Result<Vec<_>, _>>()
292    }
293
294    // Cannot include trades
295    pub fn load_order_book_deltas(
296        &self,
297        filepath: &Path,
298        instrument_id: Option<InstrumentId>,
299        price_precision: Option<u8>,
300    ) -> anyhow::Result<Vec<OrderBookDelta>> {
301        self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false)?
302            .filter_map(|result| match result {
303                Ok((Some(item1), _)) => {
304                    if let Data::Delta(delta) = item1 {
305                        Some(Ok(delta))
306                    } else {
307                        None
308                    }
309                }
310                Ok((None, _)) => None,
311                Err(e) => Some(Err(e)),
312            })
313            .collect()
314    }
315
316    pub fn load_order_book_depth10(
317        &self,
318        filepath: &Path,
319        instrument_id: Option<InstrumentId>,
320        price_precision: Option<u8>,
321    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
322        self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false)?
323            .filter_map(|result| match result {
324                Ok((Some(item1), _)) => {
325                    if let Data::Depth10(depth) = item1 {
326                        Some(Ok(*depth))
327                    } else {
328                        None
329                    }
330                }
331                Ok((None, _)) => None,
332                Err(e) => Some(Err(e)),
333            })
334            .collect()
335    }
336
337    pub fn load_quotes(
338        &self,
339        filepath: &Path,
340        instrument_id: Option<InstrumentId>,
341        price_precision: Option<u8>,
342    ) -> anyhow::Result<Vec<QuoteTick>> {
343        self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false)?
344            .filter_map(|result| match result {
345                Ok((Some(item1), _)) => {
346                    if let Data::Quote(quote) = item1 {
347                        Some(Ok(quote))
348                    } else {
349                        None
350                    }
351                }
352                Ok((None, _)) => None,
353                Err(e) => Some(Err(e)),
354            })
355            .collect()
356    }
357
358    pub fn load_bbo_quotes(
359        &self,
360        filepath: &Path,
361        instrument_id: Option<InstrumentId>,
362        price_precision: Option<u8>,
363    ) -> anyhow::Result<Vec<QuoteTick>> {
364        self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false)?
365            .filter_map(|result| match result {
366                Ok((Some(item1), _)) => {
367                    if let Data::Quote(quote) = item1 {
368                        Some(Ok(quote))
369                    } else {
370                        None
371                    }
372                }
373                Ok((None, _)) => None,
374                Err(e) => Some(Err(e)),
375            })
376            .collect()
377    }
378
379    pub fn load_tbbo_trades(
380        &self,
381        filepath: &Path,
382        instrument_id: Option<InstrumentId>,
383        price_precision: Option<u8>,
384    ) -> anyhow::Result<Vec<TradeTick>> {
385        self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false)?
386            .filter_map(|result| match result {
387                Ok((_, maybe_item2)) => {
388                    if let Some(Data::Trade(trade)) = maybe_item2 {
389                        Some(Ok(trade))
390                    } else {
391                        None
392                    }
393                }
394                Err(e) => Some(Err(e)),
395            })
396            .collect()
397    }
398
399    pub fn load_trades(
400        &self,
401        filepath: &Path,
402        instrument_id: Option<InstrumentId>,
403        price_precision: Option<u8>,
404    ) -> anyhow::Result<Vec<TradeTick>> {
405        self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false)?
406            .filter_map(|result| match result {
407                Ok((Some(item1), _)) => {
408                    if let Data::Trade(trade) = item1 {
409                        Some(Ok(trade))
410                    } else {
411                        None
412                    }
413                }
414                Ok((None, _)) => None,
415                Err(e) => Some(Err(e)),
416            })
417            .collect()
418    }
419
420    pub fn load_bars(
421        &self,
422        filepath: &Path,
423        instrument_id: Option<InstrumentId>,
424        price_precision: Option<u8>,
425    ) -> anyhow::Result<Vec<Bar>> {
426        self.read_records::<dbn::OhlcvMsg>(filepath, instrument_id, price_precision, false)?
427            .filter_map(|result| match result {
428                Ok((Some(item1), _)) => {
429                    if let Data::Bar(bar) = item1 {
430                        Some(Ok(bar))
431                    } else {
432                        None
433                    }
434                }
435                Ok((None, _)) => None,
436                Err(e) => Some(Err(e)),
437            })
438            .collect()
439    }
440
441    pub fn load_status_records<T>(
442        &self,
443        filepath: &Path,
444        instrument_id: Option<InstrumentId>,
445    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
446    where
447        T: dbn::Record + dbn::HasRType + 'static,
448    {
449        let decoder = Decoder::from_zstd_file(filepath)?;
450        let metadata = decoder.metadata().clone();
451        let mut metadata_cache = MetadataCache::new(metadata);
452        let mut dbn_stream = decoder.decode_stream::<T>();
453
454        Ok(std::iter::from_fn(move || {
455            if let Err(e) = dbn_stream.advance() {
456                return Some(Err(e.into()));
457            }
458            match dbn_stream.get() {
459                Some(rec) => {
460                    let record = dbn::RecordRef::from(rec);
461                    let instrument_id = match &instrument_id {
462                        Some(id) => *id, // Copy
463                        None => decode_nautilus_instrument_id(
464                            &record,
465                            &mut metadata_cache,
466                            &self.publisher_venue_map,
467                            &self.symbol_venue_map,
468                        )
469                        .expect("Failed to decode record"),
470                    };
471
472                    let msg = record.get::<dbn::StatusMsg>().expect("Invalid `StatusMsg`");
473                    match decode_status_msg(msg, instrument_id, msg.ts_recv.into()) {
474                        Ok(data) => Some(Ok(data)),
475                        Err(e) => Some(Err(e)),
476                    }
477                }
478                None => None,
479            }
480        }))
481    }
482
483    pub fn read_imbalance_records<T>(
484        &self,
485        filepath: &Path,
486        instrument_id: Option<InstrumentId>,
487        price_precision: Option<u8>,
488    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
489    where
490        T: dbn::Record + dbn::HasRType + 'static,
491    {
492        let decoder = Decoder::from_zstd_file(filepath)?;
493        let metadata = decoder.metadata().clone();
494        let mut metadata_cache = MetadataCache::new(metadata);
495        let mut dbn_stream = decoder.decode_stream::<T>();
496
497        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
498
499        Ok(std::iter::from_fn(move || {
500            if let Err(e) = dbn_stream.advance() {
501                return Some(Err(e.into()));
502            }
503            match dbn_stream.get() {
504                Some(rec) => {
505                    let record = dbn::RecordRef::from(rec);
506                    let instrument_id = match &instrument_id {
507                        Some(id) => *id, // Copy
508                        None => decode_nautilus_instrument_id(
509                            &record,
510                            &mut metadata_cache,
511                            &self.publisher_venue_map,
512                            &self.symbol_venue_map,
513                        )
514                        .expect("Failed to decode record"),
515                    };
516
517                    let msg = record
518                        .get::<dbn::ImbalanceMsg>()
519                        .expect("Invalid `ImbalanceMsg`");
520                    match decode_imbalance_msg(
521                        msg,
522                        instrument_id,
523                        price_precision,
524                        msg.ts_recv.into(),
525                    ) {
526                        Ok(data) => Some(Ok(data)),
527                        Err(e) => Some(Err(e)),
528                    }
529                }
530                None => None,
531            }
532        }))
533    }
534
535    pub fn read_statistics_records<T>(
536        &self,
537        filepath: &Path,
538        instrument_id: Option<InstrumentId>,
539        price_precision: Option<u8>,
540    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
541    where
542        T: dbn::Record + dbn::HasRType + 'static,
543    {
544        let decoder = Decoder::from_zstd_file(filepath)?;
545        let metadata = decoder.metadata().clone();
546        let mut metadata_cache = MetadataCache::new(metadata);
547        let mut dbn_stream = decoder.decode_stream::<T>();
548
549        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
550
551        Ok(std::iter::from_fn(move || {
552            if let Err(e) = dbn_stream.advance() {
553                return Some(Err(e.into()));
554            }
555            match dbn_stream.get() {
556                Some(rec) => {
557                    let record = dbn::RecordRef::from(rec);
558                    let instrument_id = match &instrument_id {
559                        Some(id) => *id, // Copy
560                        None => decode_nautilus_instrument_id(
561                            &record,
562                            &mut metadata_cache,
563                            &self.publisher_venue_map,
564                            &self.symbol_venue_map,
565                        )
566                        .expect("Failed to decode record"),
567                    };
568
569                    let msg = record.get::<dbn::StatMsg>().expect("Invalid `StatMsg`");
570                    match decode_statistics_msg(
571                        msg,
572                        instrument_id,
573                        price_precision,
574                        msg.ts_recv.into(),
575                    ) {
576                        Ok(data) => Some(Ok(data)),
577                        Err(e) => Some(Err(e)),
578                    }
579                }
580                None => None,
581            }
582        }))
583    }
584}
585
586////////////////////////////////////////////////////////////////////////////////
587// Tests
588////////////////////////////////////////////////////////////////////////////////
589#[cfg(test)]
590mod tests {
591    use std::path::{Path, PathBuf};
592
593    use rstest::{fixture, rstest};
594    use ustr::Ustr;
595
596    use super::*;
597
598    fn test_data_path() -> PathBuf {
599        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
600    }
601
602    #[fixture]
603    fn loader() -> DatabentoDataLoader {
604        let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
605        DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
606    }
607
608    // TODO: Improve the below assertions that we've actually read the records we expected
609
610    #[rstest]
611    fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
612        let dataset = Ustr::from("EQUS.PLUS");
613        let venue = Venue::from("XNAS");
614        loader.set_dataset_for_venue(dataset, venue);
615
616        let result = loader.get_dataset_for_venue(&venue).unwrap();
617        assert_eq!(*result, dataset);
618    }
619
620    #[rstest]
621    // #[case(test_data_path().join("test_data.definition.dbn.zst"))] // TODO: Fails
622    #[case(test_data_path().join("test_data.definition.v1.dbn.zst"))]
623    fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
624        let instruments = loader.load_instruments(&path, false).unwrap();
625
626        assert_eq!(instruments.len(), 2);
627    }
628
629    #[rstest]
630    fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
631        let path = test_data_path().join("test_data.mbo.dbn.zst");
632        let instrument_id = InstrumentId::from("ESM4.GLBX");
633
634        let deltas = loader
635            .load_order_book_deltas(&path, Some(instrument_id), None)
636            .unwrap();
637
638        assert_eq!(deltas.len(), 2);
639    }
640
641    #[rstest]
642    fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
643        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
644        let instrument_id = InstrumentId::from("ESM4.GLBX");
645
646        let depths = loader
647            .load_order_book_depth10(&path, Some(instrument_id), None)
648            .unwrap();
649
650        assert_eq!(depths.len(), 2);
651    }
652
653    #[rstest]
654    fn test_load_quotes(loader: DatabentoDataLoader) {
655        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
656        let instrument_id = InstrumentId::from("ESM4.GLBX");
657
658        let quotes = loader
659            .load_quotes(&path, Some(instrument_id), None)
660            .unwrap();
661
662        assert_eq!(quotes.len(), 2);
663    }
664
665    #[rstest]
666    #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
667    #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
668    fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
669        let instrument_id = InstrumentId::from("ESM4.GLBX");
670
671        let quotes = loader
672            .load_bbo_quotes(&path, Some(instrument_id), None)
673            .unwrap();
674
675        assert_eq!(quotes.len(), 2);
676    }
677
678    #[rstest]
679    fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
680        let path = test_data_path().join("test_data.tbbo.dbn.zst");
681        let instrument_id = InstrumentId::from("ESM4.GLBX");
682
683        let _trades = loader
684            .load_tbbo_trades(&path, Some(instrument_id), None)
685            .unwrap();
686
687        // assert_eq!(trades.len(), 2);  TODO: No records?
688    }
689
690    #[rstest]
691    fn test_load_trades(loader: DatabentoDataLoader) {
692        let path = test_data_path().join("test_data.trades.dbn.zst");
693        let instrument_id = InstrumentId::from("ESM4.GLBX");
694        let trades = loader
695            .load_trades(&path, Some(instrument_id), None)
696            .unwrap();
697
698        assert_eq!(trades.len(), 2);
699    }
700
701    #[rstest]
702    // #[case(test_data_path().join("test_data.ohlcv-1d.dbn.zst"))]  // TODO: Needs new data
703    #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
704    #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
705    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
706    fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
707        let instrument_id = InstrumentId::from("ESM4.GLBX");
708        let bars = loader.load_bars(&path, Some(instrument_id), None).unwrap();
709
710        assert_eq!(bars.len(), 2);
711    }
712}