nautilus_databento/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    env, fs,
19    path::{Path, PathBuf},
20};
21
22use databento::dbn;
23use dbn::{
24    Publisher,
25    compat::InstrumentDefMsgV1,
26    decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32    identifiers::{InstrumentId, Symbol, Venue},
33    instruments::InstrumentAny,
34    types::Currency,
35};
36use ustr::Ustr;
37
38use super::{
39    decode::{
40        decode_imbalance_msg, decode_instrument_def_msg_v1, decode_record, decode_statistics_msg,
41        decode_status_msg,
42    },
43    symbology::decode_nautilus_instrument_id,
44    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
45};
46
47/// A Nautilus data loader for Databento Binary Encoding (DBN) format data.
48///
49/// # Supported schemas:
50///  - MBO -> `OrderBookDelta`
51///  - MBP_1 -> `(QuoteTick, Option<TradeTick>)`
52///  - MBP_10 -> `OrderBookDepth10`
53///  - BBO_1S -> `QuoteTick`
54///  - BBO_1M -> `QuoteTick`
55///  - TBBO -> `(QuoteTick, TradeTick)`
56///  - TRADES -> `TradeTick`
57///  - OHLCV_1S -> `Bar`
58///  - OHLCV_1M -> `Bar`
59///  - OHLCV_1H -> `Bar`
60///  - OHLCV_1D -> `Bar`
61///  - DEFINITION -> `Instrument`
62///  - IMBALANCE -> `DatabentoImbalance`
63///  - STATISTICS -> `DatabentoStatistics`
64///  - STATUS -> `InstrumentStatus`
65///
66/// # References
67///
68/// <https://databento.com/docs/schemas-and-data-formats>
69#[cfg_attr(
70    feature = "python",
71    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
72)]
73#[derive(Debug)]
74pub struct DatabentoDataLoader {
75    publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
76    venue_dataset_map: IndexMap<Venue, Dataset>,
77    publisher_venue_map: IndexMap<PublisherId, Venue>,
78    symbol_venue_map: HashMap<Symbol, Venue>,
79}
80
81impl DatabentoDataLoader {
82    /// Creates a new [`DatabentoDataLoader`] instance.
83    pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
84        let mut loader = Self {
85            publishers_map: IndexMap::new(),
86            venue_dataset_map: IndexMap::new(),
87            publisher_venue_map: IndexMap::new(),
88            symbol_venue_map: HashMap::new(),
89        };
90
91        // Load publishers
92        let publishers_filepath = if let Some(p) = publishers_filepath {
93            p
94        } else {
95            // Use built-in publishers path
96            let mut exe_path = env::current_exe()?;
97            exe_path.pop();
98            exe_path.push("publishers.json");
99            exe_path
100        };
101
102        loader
103            .load_publishers(publishers_filepath)
104            .unwrap_or_else(|e| panic!("Error loading publishers.json: {e}",));
105
106        Ok(loader)
107    }
108
109    /// Load the publishers data from the file at the given `filepath`.
110    pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
111        let file_content = fs::read_to_string(filepath)?;
112        let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
113
114        self.publishers_map = publishers
115            .clone()
116            .into_iter()
117            .map(|p| (p.publisher_id, p))
118            .collect::<IndexMap<u16, DatabentoPublisher>>();
119
120        self.venue_dataset_map = publishers
121            .iter()
122            .map(|p| {
123                (
124                    Venue::from(p.venue.as_str()),
125                    Dataset::from(p.dataset.as_str()),
126                )
127            })
128            .collect::<IndexMap<Venue, Ustr>>();
129
130        // Insert CME Globex exchanges
131        let glbx = Dataset::from("GLBX.MDP3");
132        self.venue_dataset_map.insert(Venue::CBCM(), glbx);
133        self.venue_dataset_map.insert(Venue::GLBX(), glbx);
134        self.venue_dataset_map.insert(Venue::NYUM(), glbx);
135        self.venue_dataset_map.insert(Venue::XCBT(), glbx);
136        self.venue_dataset_map.insert(Venue::XCEC(), glbx);
137        self.venue_dataset_map.insert(Venue::XCME(), glbx);
138        self.venue_dataset_map.insert(Venue::XFXS(), glbx);
139        self.venue_dataset_map.insert(Venue::XNYM(), glbx);
140
141        self.publisher_venue_map = publishers
142            .into_iter()
143            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
144            .collect::<IndexMap<u16, Venue>>();
145
146        Ok(())
147    }
148
149    /// Return the internal Databento publishers currently held by the loader.
150    #[must_use]
151    pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
152        &self.publishers_map
153    }
154
155    // Return the dataset which matches the given `venue` (if found).
156    #[must_use]
157    pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
158        self.venue_dataset_map.get(venue)
159    }
160
161    // Return the venue which matches the given `publisher_id` (if found).
162    #[must_use]
163    pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
164        self.publisher_venue_map.get(&publisher_id)
165    }
166
167    pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
168        let decoder = Decoder::from_zstd_file(filepath)?;
169        let metadata = decoder.metadata();
170        Ok(metadata.schema.map(|schema| schema.to_string()))
171    }
172
173    pub fn read_definition_records(
174        &mut self,
175        filepath: &Path,
176        use_exchange_as_venue: bool,
177    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
178        let mut decoder = Decoder::from_zstd_file(filepath)?;
179
180        // Setting the policy to decode v1 data in its original format,
181        // rather than upgrading to v2 for now (decoding tests fail on `UpgradeToV2`).
182        let upgrade_policy = dbn::VersionUpgradePolicy::AsIs;
183        decoder.set_upgrade_policy(upgrade_policy);
184
185        let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsgV1>();
186
187        Ok(std::iter::from_fn(move || {
188            if let Err(e) = dbn_stream.advance() {
189                return Some(Err(e.into()));
190            }
191            match dbn_stream.get() {
192                Some(rec) => {
193                    let record = dbn::RecordRef::from(rec);
194                    let msg = record.get::<InstrumentDefMsgV1>().unwrap();
195
196                    let raw_symbol = rec.raw_symbol().expect("Error decoding `raw_symbol`");
197                    let symbol = Symbol::from(raw_symbol);
198
199                    let publisher = rec.hd.publisher().expect("Invalid `publisher` for record");
200                    let venue = match publisher {
201                        Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
202                            // SAFETY: GLBX instruments have a valid `exchange` field
203                            let exchange = rec.exchange().unwrap();
204                            let venue = Venue::from_code(exchange).unwrap_or_else(|_| {
205                                panic!("`Venue` not found for exchange {exchange}")
206                            });
207                            self.symbol_venue_map.insert(symbol, venue);
208                            venue
209                        }
210                        _ => *self
211                            .publisher_venue_map
212                            .get(&msg.hd.publisher_id)
213                            .expect("`Venue` not found `publisher_id`"),
214                    };
215                    let instrument_id = InstrumentId::new(symbol, venue);
216
217                    match decode_instrument_def_msg_v1(rec, instrument_id, msg.ts_recv.into()) {
218                        Ok(data) => Some(Ok(data)),
219                        Err(e) => Some(Err(e)),
220                    }
221                }
222                None => None,
223            }
224        }))
225    }
226
227    pub fn read_records<T>(
228        &self,
229        filepath: &Path,
230        instrument_id: Option<InstrumentId>,
231        price_precision: Option<u8>,
232        include_trades: bool,
233    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
234    where
235        T: dbn::Record + dbn::HasRType + 'static,
236    {
237        let decoder = Decoder::from_zstd_file(filepath)?;
238        let metadata = decoder.metadata().clone();
239        let mut dbn_stream = decoder.decode_stream::<T>();
240
241        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
242
243        Ok(std::iter::from_fn(move || {
244            if let Err(e) = dbn_stream.advance() {
245                return Some(Err(e.into()));
246            }
247            match dbn_stream.get() {
248                Some(rec) => {
249                    let record = dbn::RecordRef::from(rec);
250                    let instrument_id = match &instrument_id {
251                        Some(id) => *id, // Copy
252                        None => decode_nautilus_instrument_id(
253                            &record,
254                            &metadata,
255                            &self.publisher_venue_map,
256                            &self.symbol_venue_map,
257                        )
258                        .expect("Failed to decode record"),
259                    };
260
261                    match decode_record(
262                        &record,
263                        instrument_id,
264                        price_precision,
265                        None,
266                        include_trades,
267                    ) {
268                        Ok(data) => Some(Ok(data)),
269                        Err(e) => Some(Err(e)),
270                    }
271                }
272                None => None,
273            }
274        }))
275    }
276
277    pub fn load_instruments(
278        &mut self,
279        filepath: &Path,
280        use_exchange_as_venue: bool,
281    ) -> anyhow::Result<Vec<InstrumentAny>> {
282        self.read_definition_records(filepath, use_exchange_as_venue)?
283            .collect::<Result<Vec<_>, _>>()
284    }
285
286    // Cannot include trades
287    pub fn load_order_book_deltas(
288        &self,
289        filepath: &Path,
290        instrument_id: Option<InstrumentId>,
291        price_precision: Option<u8>,
292    ) -> anyhow::Result<Vec<OrderBookDelta>> {
293        self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false)?
294            .filter_map(|result| match result {
295                Ok((Some(item1), _)) => {
296                    if let Data::Delta(delta) = item1 {
297                        Some(Ok(delta))
298                    } else {
299                        None
300                    }
301                }
302                Ok((None, _)) => None,
303                Err(e) => Some(Err(e)),
304            })
305            .collect()
306    }
307
308    pub fn load_order_book_depth10(
309        &self,
310        filepath: &Path,
311        instrument_id: Option<InstrumentId>,
312        price_precision: Option<u8>,
313    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
314        self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false)?
315            .filter_map(|result| match result {
316                Ok((Some(item1), _)) => {
317                    if let Data::Depth10(depth) = item1 {
318                        Some(Ok(*depth))
319                    } else {
320                        None
321                    }
322                }
323                Ok((None, _)) => None,
324                Err(e) => Some(Err(e)),
325            })
326            .collect()
327    }
328
329    pub fn load_quotes(
330        &self,
331        filepath: &Path,
332        instrument_id: Option<InstrumentId>,
333        price_precision: Option<u8>,
334    ) -> anyhow::Result<Vec<QuoteTick>> {
335        self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false)?
336            .filter_map(|result| match result {
337                Ok((Some(item1), _)) => {
338                    if let Data::Quote(quote) = item1 {
339                        Some(Ok(quote))
340                    } else {
341                        None
342                    }
343                }
344                Ok((None, _)) => None,
345                Err(e) => Some(Err(e)),
346            })
347            .collect()
348    }
349
350    pub fn load_bbo_quotes(
351        &self,
352        filepath: &Path,
353        instrument_id: Option<InstrumentId>,
354        price_precision: Option<u8>,
355    ) -> anyhow::Result<Vec<QuoteTick>> {
356        self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false)?
357            .filter_map(|result| match result {
358                Ok((Some(item1), _)) => {
359                    if let Data::Quote(quote) = item1 {
360                        Some(Ok(quote))
361                    } else {
362                        None
363                    }
364                }
365                Ok((None, _)) => None,
366                Err(e) => Some(Err(e)),
367            })
368            .collect()
369    }
370
371    pub fn load_tbbo_trades(
372        &self,
373        filepath: &Path,
374        instrument_id: Option<InstrumentId>,
375        price_precision: Option<u8>,
376    ) -> anyhow::Result<Vec<TradeTick>> {
377        self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false)?
378            .filter_map(|result| match result {
379                Ok((_, maybe_item2)) => {
380                    if let Some(Data::Trade(trade)) = maybe_item2 {
381                        Some(Ok(trade))
382                    } else {
383                        None
384                    }
385                }
386                Err(e) => Some(Err(e)),
387            })
388            .collect()
389    }
390
391    pub fn load_trades(
392        &self,
393        filepath: &Path,
394        instrument_id: Option<InstrumentId>,
395        price_precision: Option<u8>,
396    ) -> anyhow::Result<Vec<TradeTick>> {
397        self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false)?
398            .filter_map(|result| match result {
399                Ok((Some(item1), _)) => {
400                    if let Data::Trade(trade) = item1 {
401                        Some(Ok(trade))
402                    } else {
403                        None
404                    }
405                }
406                Ok((None, _)) => None,
407                Err(e) => Some(Err(e)),
408            })
409            .collect()
410    }
411
412    pub fn load_bars(
413        &self,
414        filepath: &Path,
415        instrument_id: Option<InstrumentId>,
416        price_precision: Option<u8>,
417    ) -> anyhow::Result<Vec<Bar>> {
418        self.read_records::<dbn::OhlcvMsg>(filepath, instrument_id, price_precision, false)?
419            .filter_map(|result| match result {
420                Ok((Some(item1), _)) => {
421                    if let Data::Bar(bar) = item1 {
422                        Some(Ok(bar))
423                    } else {
424                        None
425                    }
426                }
427                Ok((None, _)) => None,
428                Err(e) => Some(Err(e)),
429            })
430            .collect()
431    }
432
433    pub fn load_status_records<T>(
434        &self,
435        filepath: &Path,
436        instrument_id: Option<InstrumentId>,
437    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
438    where
439        T: dbn::Record + dbn::HasRType + 'static,
440    {
441        let decoder = Decoder::from_zstd_file(filepath)?;
442        let metadata = decoder.metadata().clone();
443        let mut dbn_stream = decoder.decode_stream::<T>();
444
445        Ok(std::iter::from_fn(move || {
446            if let Err(e) = dbn_stream.advance() {
447                return Some(Err(e.into()));
448            }
449            match dbn_stream.get() {
450                Some(rec) => {
451                    let record = dbn::RecordRef::from(rec);
452                    let instrument_id = match &instrument_id {
453                        Some(id) => *id, // Copy
454                        None => decode_nautilus_instrument_id(
455                            &record,
456                            &metadata,
457                            &self.publisher_venue_map,
458                            &self.symbol_venue_map,
459                        )
460                        .expect("Failed to decode record"),
461                    };
462
463                    let msg = record.get::<dbn::StatusMsg>().expect("Invalid `StatusMsg`");
464                    match decode_status_msg(msg, instrument_id, msg.ts_recv.into()) {
465                        Ok(data) => Some(Ok(data)),
466                        Err(e) => Some(Err(e)),
467                    }
468                }
469                None => None,
470            }
471        }))
472    }
473
474    pub fn read_imbalance_records<T>(
475        &self,
476        filepath: &Path,
477        instrument_id: Option<InstrumentId>,
478        price_precision: Option<u8>,
479    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
480    where
481        T: dbn::Record + dbn::HasRType + 'static,
482    {
483        let decoder = Decoder::from_zstd_file(filepath)?;
484        let metadata = decoder.metadata().clone();
485        let mut dbn_stream = decoder.decode_stream::<T>();
486
487        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
488
489        Ok(std::iter::from_fn(move || {
490            if let Err(e) = dbn_stream.advance() {
491                return Some(Err(e.into()));
492            }
493            match dbn_stream.get() {
494                Some(rec) => {
495                    let record = dbn::RecordRef::from(rec);
496                    let instrument_id = match &instrument_id {
497                        Some(id) => *id, // Copy
498                        None => decode_nautilus_instrument_id(
499                            &record,
500                            &metadata,
501                            &self.publisher_venue_map,
502                            &self.symbol_venue_map,
503                        )
504                        .expect("Failed to decode record"),
505                    };
506
507                    let msg = record
508                        .get::<dbn::ImbalanceMsg>()
509                        .expect("Invalid `ImbalanceMsg`");
510                    match decode_imbalance_msg(
511                        msg,
512                        instrument_id,
513                        price_precision,
514                        msg.ts_recv.into(),
515                    ) {
516                        Ok(data) => Some(Ok(data)),
517                        Err(e) => Some(Err(e)),
518                    }
519                }
520                None => None,
521            }
522        }))
523    }
524
525    pub fn read_statistics_records<T>(
526        &self,
527        filepath: &Path,
528        instrument_id: Option<InstrumentId>,
529        price_precision: Option<u8>,
530    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
531    where
532        T: dbn::Record + dbn::HasRType + 'static,
533    {
534        let decoder = Decoder::from_zstd_file(filepath)?;
535        let metadata = decoder.metadata().clone();
536        let mut dbn_stream = decoder.decode_stream::<T>();
537
538        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
539
540        Ok(std::iter::from_fn(move || {
541            if let Err(e) = dbn_stream.advance() {
542                return Some(Err(e.into()));
543            }
544            match dbn_stream.get() {
545                Some(rec) => {
546                    let record = dbn::RecordRef::from(rec);
547                    let instrument_id = match &instrument_id {
548                        Some(id) => *id, // Copy
549                        None => decode_nautilus_instrument_id(
550                            &record,
551                            &metadata,
552                            &self.publisher_venue_map,
553                            &self.symbol_venue_map,
554                        )
555                        .expect("Failed to decode record"),
556                    };
557
558                    let msg = record.get::<dbn::StatMsg>().expect("Invalid `StatMsg`");
559                    match decode_statistics_msg(
560                        msg,
561                        instrument_id,
562                        price_precision,
563                        msg.ts_recv.into(),
564                    ) {
565                        Ok(data) => Some(Ok(data)),
566                        Err(e) => Some(Err(e)),
567                    }
568                }
569                None => None,
570            }
571        }))
572    }
573}
574
575////////////////////////////////////////////////////////////////////////////////
576// Tests
577////////////////////////////////////////////////////////////////////////////////
578#[cfg(test)]
579mod tests {
580    use std::path::{Path, PathBuf};
581
582    use rstest::*;
583
584    use super::*;
585
586    fn test_data_path() -> PathBuf {
587        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
588    }
589
590    fn data_loader() -> DatabentoDataLoader {
591        let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
592        DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
593    }
594
595    // TODO: Improve the below assertions that we've actually read the records we expected
596
597    #[rstest]
598    // #[case(test_data_path().join("test_data.definition.dbn.zst"))] // TODO: Fails
599    #[case(test_data_path().join("test_data.definition.v1.dbn.zst"))]
600    fn test_load_instruments(#[case] path: PathBuf) {
601        let mut loader = data_loader();
602        let instruments = loader.load_instruments(&path, false).unwrap();
603
604        assert_eq!(instruments.len(), 2);
605    }
606
607    #[rstest]
608    fn test_load_order_book_deltas() {
609        let path = test_data_path().join("test_data.mbo.dbn.zst");
610        let loader = data_loader();
611        let instrument_id = InstrumentId::from("ESM4.GLBX");
612
613        let deltas = loader
614            .load_order_book_deltas(&path, Some(instrument_id), None)
615            .unwrap();
616
617        assert_eq!(deltas.len(), 2);
618    }
619
620    #[rstest]
621    fn test_load_order_book_depth10() {
622        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
623        let loader = data_loader();
624        let instrument_id = InstrumentId::from("ESM4.GLBX");
625
626        let depths = loader
627            .load_order_book_depth10(&path, Some(instrument_id), None)
628            .unwrap();
629
630        assert_eq!(depths.len(), 2);
631    }
632
633    #[rstest]
634    fn test_load_quotes() {
635        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
636        let loader = data_loader();
637        let instrument_id = InstrumentId::from("ESM4.GLBX");
638
639        let quotes = loader
640            .load_quotes(&path, Some(instrument_id), None)
641            .unwrap();
642
643        assert_eq!(quotes.len(), 2);
644    }
645
646    #[rstest]
647    #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
648    #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
649    fn test_load_bbo_quotes(#[case] path: PathBuf) {
650        let loader = data_loader();
651        let instrument_id = InstrumentId::from("ESM4.GLBX");
652
653        let quotes = loader
654            .load_bbo_quotes(&path, Some(instrument_id), None)
655            .unwrap();
656
657        assert_eq!(quotes.len(), 2);
658    }
659
660    #[rstest]
661    fn test_load_tbbo_trades() {
662        let path = test_data_path().join("test_data.tbbo.dbn.zst");
663        let loader = data_loader();
664        let instrument_id = InstrumentId::from("ESM4.GLBX");
665
666        let _trades = loader
667            .load_tbbo_trades(&path, Some(instrument_id), None)
668            .unwrap();
669
670        // assert_eq!(trades.len(), 2);  TODO: No records?
671    }
672
673    #[rstest]
674    fn test_load_trades() {
675        let path = test_data_path().join("test_data.trades.dbn.zst");
676        let loader = data_loader();
677
678        let instrument_id = InstrumentId::from("ESM4.GLBX");
679        let trades = loader
680            .load_trades(&path, Some(instrument_id), None)
681            .unwrap();
682
683        assert_eq!(trades.len(), 2);
684    }
685
686    #[rstest]
687    // #[case(test_data_path().join("test_data.ohlcv-1d.dbn.zst"))]  // TODO: Needs new data
688    #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
689    #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
690    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
691    fn test_load_bars(#[case] path: PathBuf) {
692        let loader = data_loader();
693
694        let instrument_id = InstrumentId::from("ESM4.GLBX");
695        let bars = loader.load_bars(&path, Some(instrument_id), None).unwrap();
696
697        assert_eq!(bars.len(), 2);
698    }
699}