1use std::{
17 env, fs,
18 path::{Path, PathBuf},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use databento::dbn::{self, InstrumentDefMsg};
24use dbn::{
25 Publisher,
26 decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32 identifiers::{InstrumentId, Symbol, Venue},
33 instruments::InstrumentAny,
34 types::Currency,
35};
36
37use super::{
38 decode::{decode_imbalance_msg, decode_record, decode_statistics_msg, decode_status_msg},
39 symbology::decode_nautilus_instrument_id,
40 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
41};
42use crate::{decode::decode_instrument_def_msg, symbology::MetadataCache};
43
44#[cfg_attr(
72 feature = "python",
73 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
74)]
75#[derive(Debug)]
76pub struct DatabentoDataLoader {
77 publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
78 venue_dataset_map: IndexMap<Venue, Dataset>,
79 publisher_venue_map: IndexMap<PublisherId, Venue>,
80 symbol_venue_map: AHashMap<Symbol, Venue>,
81}
82
83impl DatabentoDataLoader {
84 pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
90 let mut loader = Self {
91 publishers_map: IndexMap::new(),
92 venue_dataset_map: IndexMap::new(),
93 publisher_venue_map: IndexMap::new(),
94 symbol_venue_map: AHashMap::new(),
95 };
96
97 let publishers_filepath = if let Some(p) = publishers_filepath {
99 p
100 } else {
101 let mut exe_path = env::current_exe()?;
103 exe_path.pop();
104 exe_path.push("publishers.json");
105 exe_path
106 };
107
108 loader
109 .load_publishers(publishers_filepath)
110 .context("Error loading publishers.json")?;
111
112 Ok(loader)
113 }
114
115 pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
121 let file_content = fs::read_to_string(filepath)?;
122 let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
123
124 self.publishers_map = publishers
125 .clone()
126 .into_iter()
127 .map(|p| (p.publisher_id, p))
128 .collect();
129
130 let mut venue_dataset_map = IndexMap::new();
131
132 for publisher in &publishers {
134 let venue = Venue::from(publisher.venue.as_str());
135 let dataset = Dataset::from(publisher.dataset.as_str());
136 venue_dataset_map.entry(venue).or_insert(dataset);
137 }
138
139 self.venue_dataset_map = venue_dataset_map;
140
141 let glbx = Dataset::from("GLBX.MDP3");
143 self.venue_dataset_map.insert(Venue::CBCM(), glbx);
144 self.venue_dataset_map.insert(Venue::GLBX(), glbx);
145 self.venue_dataset_map.insert(Venue::NYUM(), glbx);
146 self.venue_dataset_map.insert(Venue::XCBT(), glbx);
147 self.venue_dataset_map.insert(Venue::XCEC(), glbx);
148 self.venue_dataset_map.insert(Venue::XCME(), glbx);
149 self.venue_dataset_map.insert(Venue::XFXS(), glbx);
150 self.venue_dataset_map.insert(Venue::XNYM(), glbx);
151
152 self.publisher_venue_map = publishers
153 .into_iter()
154 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
155 .collect();
156
157 Ok(())
158 }
159
160 #[must_use]
162 pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
163 &self.publishers_map
164 }
165
166 pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
168 _ = self.venue_dataset_map.insert(venue, dataset);
169 }
170
171 #[must_use]
173 pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
174 self.venue_dataset_map.get(venue)
175 }
176
177 #[must_use]
179 pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
180 self.publisher_venue_map.get(&publisher_id)
181 }
182
183 pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
189 let decoder = Decoder::from_zstd_file(filepath)?;
190 let metadata = decoder.metadata();
191 Ok(metadata.schema.map(|schema| schema.to_string()))
192 }
193
194 pub fn read_definition_records(
200 &mut self,
201 filepath: &Path,
202 use_exchange_as_venue: bool,
203 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
204 let decoder = Decoder::from_zstd_file(filepath)?;
205 let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsg>();
206
207 Ok(std::iter::from_fn(move || {
208 let result: anyhow::Result<Option<InstrumentAny>> = (|| {
209 dbn_stream
210 .advance()
211 .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
212
213 if let Some(rec) = dbn_stream.get() {
214 let record = dbn::RecordRef::from(rec);
215 let msg = record
216 .get::<InstrumentDefMsg>()
217 .ok_or_else(|| anyhow::anyhow!("Failed to decode InstrumentDefMsg"))?;
218
219 let raw_symbol = rec
221 .raw_symbol()
222 .map_err(|e| anyhow::anyhow!("Error decoding `raw_symbol`: {e}"))?;
223 let symbol = Symbol::from(raw_symbol);
224
225 let publisher = rec
226 .hd
227 .publisher()
228 .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
229 let venue = match publisher {
230 Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
231 let exchange = rec.exchange().map_err(|e| {
232 anyhow::anyhow!("Missing `exchange` for record: {e}")
233 })?;
234 let venue = Venue::from_code(exchange).map_err(|e| {
235 anyhow::anyhow!("Venue not found for exchange {exchange}: {e}")
236 })?;
237 self.symbol_venue_map.insert(symbol, venue);
238 venue
239 }
240 _ => *self
241 .publisher_venue_map
242 .get(&msg.hd.publisher_id)
243 .ok_or_else(|| {
244 anyhow::anyhow!(
245 "Venue not found for publisher_id {}",
246 msg.hd.publisher_id
247 )
248 })?,
249 };
250 let instrument_id = InstrumentId::new(symbol, venue);
251 let ts_init = msg.ts_recv.into();
252
253 let data = decode_instrument_def_msg(rec, instrument_id, Some(ts_init))?;
254 Ok(Some(data))
255 } else {
256 Ok(None)
258 }
259 })();
260
261 match result {
262 Ok(Some(item)) => Some(Ok(item)),
263 Ok(None) => None,
264 Err(e) => Some(Err(e)),
265 }
266 }))
267 }
268
269 pub fn read_records<T>(
275 &self,
276 filepath: &Path,
277 instrument_id: Option<InstrumentId>,
278 price_precision: Option<u8>,
279 include_trades: bool,
280 bars_timestamp_on_close: Option<bool>,
281 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
282 where
283 T: dbn::Record + dbn::HasRType + 'static,
284 {
285 let decoder = Decoder::from_zstd_file(filepath)?;
286 let metadata = decoder.metadata().clone();
287 let mut metadata_cache = MetadataCache::new(metadata);
288 let mut dbn_stream = decoder.decode_stream::<T>();
289
290 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
291
292 Ok(std::iter::from_fn(move || {
293 let result: anyhow::Result<Option<(Option<Data>, Option<Data>)>> = (|| {
294 dbn_stream
295 .advance()
296 .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
297 if let Some(rec) = dbn_stream.get() {
298 let record = dbn::RecordRef::from(rec);
299 let instrument_id = if let Some(id) = &instrument_id {
300 *id
301 } else {
302 decode_nautilus_instrument_id(
303 &record,
304 &mut metadata_cache,
305 &self.publisher_venue_map,
306 &self.symbol_venue_map,
307 )
308 .context("Failed to decode instrument id")?
309 };
310 let (item1, item2) = decode_record(
311 &record,
312 instrument_id,
313 price_precision,
314 None,
315 include_trades,
316 bars_timestamp_on_close.unwrap_or(true),
317 )?;
318 Ok(Some((item1, item2)))
319 } else {
320 Ok(None)
321 }
322 })();
323 match result {
324 Ok(Some(v)) => Some(Ok(v)),
325 Ok(None) => None,
326 Err(e) => Some(Err(e)),
327 }
328 }))
329 }
330
331 pub fn load_instruments(
337 &mut self,
338 filepath: &Path,
339 use_exchange_as_venue: bool,
340 ) -> anyhow::Result<Vec<InstrumentAny>> {
341 self.read_definition_records(filepath, use_exchange_as_venue)?
342 .collect::<Result<Vec<_>, _>>()
343 }
344
345 pub fn load_order_book_deltas(
353 &self,
354 filepath: &Path,
355 instrument_id: Option<InstrumentId>,
356 price_precision: Option<u8>,
357 ) -> anyhow::Result<Vec<OrderBookDelta>> {
358 self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false, None)?
359 .filter_map(|result| match result {
360 Ok((Some(item1), _)) => {
361 if let Data::Delta(delta) = item1 {
362 Some(Ok(delta))
363 } else {
364 None
365 }
366 }
367 Ok((None, _)) => None,
368 Err(e) => Some(Err(e)),
369 })
370 .collect()
371 }
372
373 pub fn load_order_book_depth10(
379 &self,
380 filepath: &Path,
381 instrument_id: Option<InstrumentId>,
382 price_precision: Option<u8>,
383 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
384 self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false, None)?
385 .filter_map(|result| match result {
386 Ok((Some(item1), _)) => {
387 if let Data::Depth10(depth) = item1 {
388 Some(Ok(*depth))
389 } else {
390 None
391 }
392 }
393 Ok((None, _)) => None,
394 Err(e) => Some(Err(e)),
395 })
396 .collect()
397 }
398
399 pub fn load_quotes(
405 &self,
406 filepath: &Path,
407 instrument_id: Option<InstrumentId>,
408 price_precision: Option<u8>,
409 ) -> anyhow::Result<Vec<QuoteTick>> {
410 self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false, None)?
411 .filter_map(|result| match result {
412 Ok((Some(item1), _)) => {
413 if let Data::Quote(quote) = item1 {
414 Some(Ok(quote))
415 } else {
416 None
417 }
418 }
419 Ok((None, _)) => None,
420 Err(e) => Some(Err(e)),
421 })
422 .collect()
423 }
424
425 pub fn load_bbo_quotes(
431 &self,
432 filepath: &Path,
433 instrument_id: Option<InstrumentId>,
434 price_precision: Option<u8>,
435 ) -> anyhow::Result<Vec<QuoteTick>> {
436 self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false, None)?
437 .filter_map(|result| match result {
438 Ok((Some(item1), _)) => {
439 if let Data::Quote(quote) = item1 {
440 Some(Ok(quote))
441 } else {
442 None
443 }
444 }
445 Ok((None, _)) => None,
446 Err(e) => Some(Err(e)),
447 })
448 .collect()
449 }
450
451 pub fn load_cmbp_quotes(
457 &self,
458 filepath: &Path,
459 instrument_id: Option<InstrumentId>,
460 price_precision: Option<u8>,
461 ) -> anyhow::Result<Vec<QuoteTick>> {
462 self.read_records::<dbn::Cmbp1Msg>(filepath, instrument_id, price_precision, false, None)?
463 .filter_map(|result| match result {
464 Ok((Some(item1), _)) => {
465 if let Data::Quote(quote) = item1 {
466 Some(Ok(quote))
467 } else {
468 None
469 }
470 }
471 Ok((None, _)) => None,
472 Err(e) => Some(Err(e)),
473 })
474 .collect()
475 }
476
477 pub fn load_cbbo_quotes(
483 &self,
484 filepath: &Path,
485 instrument_id: Option<InstrumentId>,
486 price_precision: Option<u8>,
487 ) -> anyhow::Result<Vec<QuoteTick>> {
488 self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
489 .filter_map(|result| match result {
490 Ok((Some(item1), _)) => {
491 if let Data::Quote(quote) = item1 {
492 Some(Ok(quote))
493 } else {
494 None
495 }
496 }
497 Ok((None, _)) => None,
498 Err(e) => Some(Err(e)),
499 })
500 .collect()
501 }
502
503 pub fn load_tbbo_trades(
509 &self,
510 filepath: &Path,
511 instrument_id: Option<InstrumentId>,
512 price_precision: Option<u8>,
513 ) -> anyhow::Result<Vec<TradeTick>> {
514 self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false, None)?
515 .filter_map(|result| match result {
516 Ok((_, maybe_item2)) => {
517 if let Some(Data::Trade(trade)) = maybe_item2 {
518 Some(Ok(trade))
519 } else {
520 None
521 }
522 }
523 Err(e) => Some(Err(e)),
524 })
525 .collect()
526 }
527
528 pub fn load_tcbbo_trades(
534 &self,
535 filepath: &Path,
536 instrument_id: Option<InstrumentId>,
537 price_precision: Option<u8>,
538 ) -> anyhow::Result<Vec<TradeTick>> {
539 self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
540 .filter_map(|result| match result {
541 Ok((_, maybe_item2)) => {
542 if let Some(Data::Trade(trade)) = maybe_item2 {
543 Some(Ok(trade))
544 } else {
545 None
546 }
547 }
548 Err(e) => Some(Err(e)),
549 })
550 .collect()
551 }
552
553 pub fn load_trades(
559 &self,
560 filepath: &Path,
561 instrument_id: Option<InstrumentId>,
562 price_precision: Option<u8>,
563 ) -> anyhow::Result<Vec<TradeTick>> {
564 self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false, None)?
565 .filter_map(|result| match result {
566 Ok((Some(item1), _)) => {
567 if let Data::Trade(trade) = item1 {
568 Some(Ok(trade))
569 } else {
570 None
571 }
572 }
573 Ok((None, _)) => None,
574 Err(e) => Some(Err(e)),
575 })
576 .collect()
577 }
578
579 pub fn load_bars(
585 &self,
586 filepath: &Path,
587 instrument_id: Option<InstrumentId>,
588 price_precision: Option<u8>,
589 timestamp_on_close: Option<bool>,
590 ) -> anyhow::Result<Vec<Bar>> {
591 self.read_records::<dbn::OhlcvMsg>(
592 filepath,
593 instrument_id,
594 price_precision,
595 false,
596 timestamp_on_close,
597 )?
598 .filter_map(|result| match result {
599 Ok((Some(item1), _)) => {
600 if let Data::Bar(bar) = item1 {
601 Some(Ok(bar))
602 } else {
603 None
604 }
605 }
606 Ok((None, _)) => None,
607 Err(e) => Some(Err(e)),
608 })
609 .collect()
610 }
611
612 pub fn load_status_records<T>(
618 &self,
619 filepath: &Path,
620 instrument_id: Option<InstrumentId>,
621 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
622 where
623 T: dbn::Record + dbn::HasRType + 'static,
624 {
625 let decoder = Decoder::from_zstd_file(filepath)?;
626 let metadata = decoder.metadata().clone();
627 let mut metadata_cache = MetadataCache::new(metadata);
628 let mut dbn_stream = decoder.decode_stream::<T>();
629
630 Ok(std::iter::from_fn(move || {
631 if let Err(e) = dbn_stream.advance() {
632 return Some(Err(e.into()));
633 }
634 match dbn_stream.get() {
635 Some(rec) => {
636 let record = dbn::RecordRef::from(rec);
637 let instrument_id = match &instrument_id {
638 Some(id) => *id, None => match decode_nautilus_instrument_id(
640 &record,
641 &mut metadata_cache,
642 &self.publisher_venue_map,
643 &self.symbol_venue_map,
644 ) {
645 Ok(id) => id,
646 Err(e) => return Some(Err(e)),
647 },
648 };
649
650 let msg = match record.get::<dbn::StatusMsg>() {
651 Some(m) => m,
652 None => return Some(Err(anyhow::anyhow!("Invalid `StatusMsg`"))),
653 };
654 let ts_init = msg.ts_recv.into();
655
656 match decode_status_msg(msg, instrument_id, Some(ts_init)) {
657 Ok(data) => Some(Ok(data)),
658 Err(e) => Some(Err(e)),
659 }
660 }
661 None => None,
662 }
663 }))
664 }
665
666 pub fn read_imbalance_records<T>(
672 &self,
673 filepath: &Path,
674 instrument_id: Option<InstrumentId>,
675 price_precision: Option<u8>,
676 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
677 where
678 T: dbn::Record + dbn::HasRType + 'static,
679 {
680 let decoder = Decoder::from_zstd_file(filepath)?;
681 let metadata = decoder.metadata().clone();
682 let mut metadata_cache = MetadataCache::new(metadata);
683 let mut dbn_stream = decoder.decode_stream::<T>();
684
685 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
686
687 Ok(std::iter::from_fn(move || {
688 if let Err(e) = dbn_stream.advance() {
689 return Some(Err(e.into()));
690 }
691 match dbn_stream.get() {
692 Some(rec) => {
693 let record = dbn::RecordRef::from(rec);
694 let instrument_id = match &instrument_id {
695 Some(id) => *id, None => match decode_nautilus_instrument_id(
697 &record,
698 &mut metadata_cache,
699 &self.publisher_venue_map,
700 &self.symbol_venue_map,
701 ) {
702 Ok(id) => id,
703 Err(e) => return Some(Err(e)),
704 },
705 };
706
707 let msg = match record.get::<dbn::ImbalanceMsg>() {
708 Some(m) => m,
709 None => return Some(Err(anyhow::anyhow!("Invalid `ImbalanceMsg`"))),
710 };
711 let ts_init = msg.ts_recv.into();
712
713 match decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init)) {
714 Ok(data) => Some(Ok(data)),
715 Err(e) => Some(Err(e)),
716 }
717 }
718 None => None,
719 }
720 }))
721 }
722
723 pub fn read_statistics_records<T>(
729 &self,
730 filepath: &Path,
731 instrument_id: Option<InstrumentId>,
732 price_precision: Option<u8>,
733 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
734 where
735 T: dbn::Record + dbn::HasRType + 'static,
736 {
737 let decoder = Decoder::from_zstd_file(filepath)?;
738 let metadata = decoder.metadata().clone();
739 let mut metadata_cache = MetadataCache::new(metadata);
740 let mut dbn_stream = decoder.decode_stream::<T>();
741
742 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
743
744 Ok(std::iter::from_fn(move || {
745 if let Err(e) = dbn_stream.advance() {
746 return Some(Err(e.into()));
747 }
748 match dbn_stream.get() {
749 Some(rec) => {
750 let record = dbn::RecordRef::from(rec);
751 let instrument_id = match &instrument_id {
752 Some(id) => *id, None => match decode_nautilus_instrument_id(
754 &record,
755 &mut metadata_cache,
756 &self.publisher_venue_map,
757 &self.symbol_venue_map,
758 ) {
759 Ok(id) => id,
760 Err(e) => return Some(Err(e)),
761 },
762 };
763 let msg = match record.get::<dbn::StatMsg>() {
764 Some(m) => m,
765 None => return Some(Err(anyhow::anyhow!("Invalid `StatMsg`"))),
766 };
767 let ts_init = msg.ts_recv.into();
768
769 match decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
770 {
771 Ok(data) => Some(Ok(data)),
772 Err(e) => Some(Err(e)),
773 }
774 }
775 None => None,
776 }
777 }))
778 }
779}
780
781#[cfg(test)]
785mod tests {
786 use std::path::{Path, PathBuf};
787
788 use nautilus_model::types::{Price, Quantity};
789 use rstest::{fixture, rstest};
790 use ustr::Ustr;
791
792 use super::*;
793
794 fn test_data_path() -> PathBuf {
795 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
796 }
797
798 #[fixture]
799 fn loader() -> DatabentoDataLoader {
800 let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
801 DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
802 }
803
804 #[rstest]
807 fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
808 let dataset = Ustr::from("EQUS.PLUS");
809 let venue = Venue::from("XNAS");
810 loader.set_dataset_for_venue(dataset, venue);
811
812 let result = loader.get_dataset_for_venue(&venue).unwrap();
813 assert_eq!(*result, dataset);
814 }
815
816 #[rstest]
817 #[case(test_data_path().join("test_data.definition.dbn.zst"))]
818 fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
819 let instruments = loader.load_instruments(&path, false).unwrap();
820
821 assert_eq!(instruments.len(), 2);
822 }
823
824 #[rstest]
825 fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
826 let path = test_data_path().join("test_data.mbo.dbn.zst");
827 let instrument_id = InstrumentId::from("ESM4.GLBX");
828
829 let deltas = loader
830 .load_order_book_deltas(&path, Some(instrument_id), None)
831 .unwrap();
832
833 assert_eq!(deltas.len(), 2);
834 }
835
836 #[rstest]
837 fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
838 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
839 let instrument_id = InstrumentId::from("ESM4.GLBX");
840
841 let depths = loader
842 .load_order_book_depth10(&path, Some(instrument_id), None)
843 .unwrap();
844
845 assert_eq!(depths.len(), 2);
846 }
847
848 #[rstest]
849 fn test_load_quotes(loader: DatabentoDataLoader) {
850 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
851 let instrument_id = InstrumentId::from("ESM4.GLBX");
852
853 let quotes = loader
854 .load_quotes(&path, Some(instrument_id), None)
855 .unwrap();
856
857 assert_eq!(quotes.len(), 2);
858 }
859
860 #[rstest]
861 #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
862 #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
863 fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
864 let instrument_id = InstrumentId::from("ESM4.GLBX");
865
866 let quotes = loader
867 .load_bbo_quotes(&path, Some(instrument_id), None)
868 .unwrap();
869
870 assert_eq!(quotes.len(), 4);
871 }
872
873 #[rstest]
874 fn test_load_cmbp_quotes(loader: DatabentoDataLoader) {
875 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
876 let instrument_id = InstrumentId::from("ESM4.GLBX");
877
878 let quotes = loader
879 .load_cmbp_quotes(&path, Some(instrument_id), None)
880 .unwrap();
881
882 assert_eq!(quotes.len(), 2);
884
885 let first_quote = "es[0];
887 assert_eq!(first_quote.instrument_id, instrument_id);
888 assert_eq!(first_quote.bid_price, Price::from("3720.25"));
889 assert_eq!(first_quote.ask_price, Price::from("3720.50"));
890 assert_eq!(first_quote.bid_size, Quantity::from(24));
891 assert_eq!(first_quote.ask_size, Quantity::from(11));
892 assert_eq!(first_quote.ts_event, 1609160400006136329);
893 assert_eq!(first_quote.ts_init, 1609160400006136329);
894 }
895
896 #[rstest]
897 fn test_load_cbbo_quotes(loader: DatabentoDataLoader) {
898 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
899 let instrument_id = InstrumentId::from("ESM4.GLBX");
900
901 let quotes = loader
902 .load_cbbo_quotes(&path, Some(instrument_id), None)
903 .unwrap();
904
905 assert_eq!(quotes.len(), 2);
907
908 let first_quote = "es[0];
910 assert_eq!(first_quote.instrument_id, instrument_id);
911 assert_eq!(first_quote.bid_price, Price::from("3720.25"));
912 assert_eq!(first_quote.ask_price, Price::from("3720.50"));
913 assert_eq!(first_quote.bid_size, Quantity::from(24));
914 assert_eq!(first_quote.ask_size, Quantity::from(11));
915 assert_eq!(first_quote.ts_event, 1609160400006136329);
916 assert_eq!(first_quote.ts_init, 1609160400006136329);
917 }
918
919 #[rstest]
920 fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
921 let path = test_data_path().join("test_data.tbbo.dbn.zst");
922 let instrument_id = InstrumentId::from("ESM4.GLBX");
923
924 let trades = loader
925 .load_tbbo_trades(&path, Some(instrument_id), None)
926 .unwrap();
927
928 assert_eq!(trades.len(), 0);
930 }
931
932 #[rstest]
933 fn test_load_tcbbo_trades(loader: DatabentoDataLoader) {
934 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
937 let instrument_id = InstrumentId::from("ESM4.GLBX");
938
939 let result = loader.load_tcbbo_trades(&path, Some(instrument_id), None);
940
941 assert!(result.is_ok());
942 let trades = result.unwrap();
943 assert_eq!(trades.len(), 2);
944 }
945
946 #[rstest]
947 fn test_load_trades(loader: DatabentoDataLoader) {
948 let path = test_data_path().join("test_data.trades.dbn.zst");
949 let instrument_id = InstrumentId::from("ESM4.GLBX");
950 let trades = loader
951 .load_trades(&path, Some(instrument_id), None)
952 .unwrap();
953
954 assert_eq!(trades.len(), 2);
955 }
956
957 #[rstest]
958 #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
960 #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
961 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
962 fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
963 let instrument_id = InstrumentId::from("ESM4.GLBX");
964 let bars = loader
965 .load_bars(&path, Some(instrument_id), None, None)
966 .unwrap();
967
968 assert_eq!(bars.len(), 2);
969 }
970
971 #[rstest]
972 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
973 fn test_load_bars_timestamp_on_close_true(loader: DatabentoDataLoader, #[case] path: PathBuf) {
974 let instrument_id = InstrumentId::from("ESM4.GLBX");
975 let bars = loader
976 .load_bars(&path, Some(instrument_id), None, Some(true))
977 .unwrap();
978
979 assert_eq!(bars.len(), 2);
980
981 for bar in &bars {
983 assert_eq!(
984 bar.ts_event, bar.ts_init,
985 "ts_event and ts_init should both be close time when bars_timestamp_on_close=true"
986 );
987 }
988 }
989
990 #[rstest]
991 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
992 fn test_load_bars_timestamp_on_close_false(loader: DatabentoDataLoader, #[case] path: PathBuf) {
993 let instrument_id = InstrumentId::from("ESM4.GLBX");
994 let bars = loader
995 .load_bars(&path, Some(instrument_id), None, Some(false))
996 .unwrap();
997
998 assert_eq!(bars.len(), 2);
999
1000 for bar in &bars {
1002 assert_ne!(
1003 bar.ts_event, bar.ts_init,
1004 "ts_event should be open time and ts_init should be close time when bars_timestamp_on_close=false"
1005 );
1006 assert_eq!(bar.ts_init.as_u64(), bar.ts_event.as_u64() + 1_000_000_000);
1008 }
1009 }
1010
1011 #[rstest]
1012 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 0)]
1013 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 1)]
1014 fn test_load_bars_timestamp_comparison(
1015 loader: DatabentoDataLoader,
1016 #[case] path: PathBuf,
1017 #[case] bar_index: usize,
1018 ) {
1019 let instrument_id = InstrumentId::from("ESM4.GLBX");
1020
1021 let bars_close = loader
1022 .load_bars(&path, Some(instrument_id), None, Some(true))
1023 .unwrap();
1024
1025 let bars_open = loader
1026 .load_bars(&path, Some(instrument_id), None, Some(false))
1027 .unwrap();
1028
1029 assert_eq!(bars_close.len(), bars_open.len());
1030 assert_eq!(bars_close.len(), 2);
1031
1032 let bar_close = &bars_close[bar_index];
1033 let bar_open = &bars_open[bar_index];
1034
1035 assert_eq!(bar_close.open, bar_open.open);
1037 assert_eq!(bar_close.high, bar_open.high);
1038 assert_eq!(bar_close.low, bar_open.low);
1039 assert_eq!(bar_close.close, bar_open.close);
1040 assert_eq!(bar_close.volume, bar_open.volume);
1041
1042 assert!(
1045 bar_close.ts_event > bar_open.ts_event,
1046 "Close-timestamped bar should have later timestamp than open-timestamped bar"
1047 );
1048
1049 const ONE_SECOND_NS: u64 = 1_000_000_000;
1051 assert_eq!(
1052 bar_close.ts_event.as_u64() - bar_open.ts_event.as_u64(),
1053 ONE_SECOND_NS,
1054 "Timestamp difference should be exactly 1 second for 1s bars"
1055 );
1056 }
1057}