1use std::{
17 collections::HashMap,
18 env, fs,
19 path::{Path, PathBuf},
20};
21
22use databento::dbn;
23use dbn::{
24 Publisher,
25 compat::InstrumentDefMsgV1,
26 decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32 identifiers::{InstrumentId, Symbol, Venue},
33 instruments::InstrumentAny,
34 types::Currency,
35};
36
37use super::{
38 decode::{
39 decode_imbalance_msg, decode_instrument_def_msg_v1, decode_record, decode_statistics_msg,
40 decode_status_msg,
41 },
42 symbology::decode_nautilus_instrument_id,
43 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
44};
45use crate::symbology::MetadataCache;
46
47#[cfg_attr(
70 feature = "python",
71 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
72)]
73#[derive(Debug)]
74pub struct DatabentoDataLoader {
75 publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
76 venue_dataset_map: IndexMap<Venue, Dataset>,
77 publisher_venue_map: IndexMap<PublisherId, Venue>,
78 symbol_venue_map: HashMap<Symbol, Venue>,
79}
80
81impl DatabentoDataLoader {
82 pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
84 let mut loader = Self {
85 publishers_map: IndexMap::new(),
86 venue_dataset_map: IndexMap::new(),
87 publisher_venue_map: IndexMap::new(),
88 symbol_venue_map: HashMap::new(),
89 };
90
91 let publishers_filepath = if let Some(p) = publishers_filepath {
93 p
94 } else {
95 let mut exe_path = env::current_exe()?;
97 exe_path.pop();
98 exe_path.push("publishers.json");
99 exe_path
100 };
101
102 loader
103 .load_publishers(publishers_filepath)
104 .unwrap_or_else(|e| panic!("Error loading publishers.json: {e}"));
105
106 Ok(loader)
107 }
108
109 pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
111 let file_content = fs::read_to_string(filepath)?;
112 let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
113
114 self.publishers_map = publishers
115 .clone()
116 .into_iter()
117 .map(|p| (p.publisher_id, p))
118 .collect();
119
120 let mut venue_dataset_map = IndexMap::new();
121
122 for publisher in &publishers {
124 let venue = Venue::from(publisher.venue.as_str());
125 let dataset = Dataset::from(publisher.dataset.as_str());
126 venue_dataset_map.entry(venue).or_insert(dataset);
127 }
128
129 self.venue_dataset_map = venue_dataset_map;
130
131 let glbx = Dataset::from("GLBX.MDP3");
133 self.venue_dataset_map.insert(Venue::CBCM(), glbx);
134 self.venue_dataset_map.insert(Venue::GLBX(), glbx);
135 self.venue_dataset_map.insert(Venue::NYUM(), glbx);
136 self.venue_dataset_map.insert(Venue::XCBT(), glbx);
137 self.venue_dataset_map.insert(Venue::XCEC(), glbx);
138 self.venue_dataset_map.insert(Venue::XCME(), glbx);
139 self.venue_dataset_map.insert(Venue::XFXS(), glbx);
140 self.venue_dataset_map.insert(Venue::XNYM(), glbx);
141
142 self.publisher_venue_map = publishers
143 .into_iter()
144 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
145 .collect();
146
147 Ok(())
148 }
149
150 #[must_use]
152 pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
153 &self.publishers_map
154 }
155
156 pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
158 _ = self.venue_dataset_map.insert(venue, dataset);
159 }
160
161 #[must_use]
163 pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
164 self.venue_dataset_map.get(venue)
165 }
166
167 #[must_use]
169 pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
170 self.publisher_venue_map.get(&publisher_id)
171 }
172
173 pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
175 let decoder = Decoder::from_zstd_file(filepath)?;
176 let metadata = decoder.metadata();
177 Ok(metadata.schema.map(|schema| schema.to_string()))
178 }
179
180 pub fn read_definition_records(
181 &mut self,
182 filepath: &Path,
183 use_exchange_as_venue: bool,
184 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
185 let mut decoder = Decoder::from_zstd_file(filepath)?;
186
187 let upgrade_policy = dbn::VersionUpgradePolicy::AsIs;
190 decoder.set_upgrade_policy(upgrade_policy);
191
192 let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsgV1>();
193
194 Ok(std::iter::from_fn(move || {
195 if let Err(e) = dbn_stream.advance() {
196 return Some(Err(e.into()));
197 }
198 match dbn_stream.get() {
199 Some(rec) => {
200 let record = dbn::RecordRef::from(rec);
201 let msg = record.get::<InstrumentDefMsgV1>().unwrap();
202
203 let raw_symbol = rec.raw_symbol().expect("Error decoding `raw_symbol`");
204 let symbol = Symbol::from(raw_symbol);
205
206 let publisher = rec.hd.publisher().expect("Invalid `publisher` for record");
207 let venue = match publisher {
208 Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
209 let exchange = rec.exchange().unwrap();
211 let venue = Venue::from_code(exchange).unwrap_or_else(|_| {
212 panic!("`Venue` not found for exchange {exchange}")
213 });
214 self.symbol_venue_map.insert(symbol, venue);
215 venue
216 }
217 _ => *self
218 .publisher_venue_map
219 .get(&msg.hd.publisher_id)
220 .expect("`Venue` not found `publisher_id`"),
221 };
222 let instrument_id = InstrumentId::new(symbol, venue);
223
224 match decode_instrument_def_msg_v1(rec, instrument_id, msg.ts_recv.into()) {
225 Ok(data) => Some(Ok(data)),
226 Err(e) => Some(Err(e)),
227 }
228 }
229 None => None,
230 }
231 }))
232 }
233
234 pub fn read_records<T>(
235 &self,
236 filepath: &Path,
237 instrument_id: Option<InstrumentId>,
238 price_precision: Option<u8>,
239 include_trades: bool,
240 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
241 where
242 T: dbn::Record + dbn::HasRType + 'static,
243 {
244 let decoder = Decoder::from_zstd_file(filepath)?;
245 let metadata = decoder.metadata().clone();
246 let mut metadata_cache = MetadataCache::new(metadata);
247 let mut dbn_stream = decoder.decode_stream::<T>();
248
249 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
250
251 Ok(std::iter::from_fn(move || {
252 if let Err(e) = dbn_stream.advance() {
253 return Some(Err(e.into()));
254 }
255 match dbn_stream.get() {
256 Some(rec) => {
257 let record = dbn::RecordRef::from(rec);
258 let instrument_id = match &instrument_id {
259 Some(id) => *id, None => decode_nautilus_instrument_id(
261 &record,
262 &mut metadata_cache,
263 &self.publisher_venue_map,
264 &self.symbol_venue_map,
265 )
266 .expect("Failed to decode record"),
267 };
268
269 match decode_record(
270 &record,
271 instrument_id,
272 price_precision,
273 None,
274 include_trades,
275 ) {
276 Ok(data) => Some(Ok(data)),
277 Err(e) => Some(Err(e)),
278 }
279 }
280 None => None,
281 }
282 }))
283 }
284
285 pub fn load_instruments(
286 &mut self,
287 filepath: &Path,
288 use_exchange_as_venue: bool,
289 ) -> anyhow::Result<Vec<InstrumentAny>> {
290 self.read_definition_records(filepath, use_exchange_as_venue)?
291 .collect::<Result<Vec<_>, _>>()
292 }
293
294 pub fn load_order_book_deltas(
296 &self,
297 filepath: &Path,
298 instrument_id: Option<InstrumentId>,
299 price_precision: Option<u8>,
300 ) -> anyhow::Result<Vec<OrderBookDelta>> {
301 self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false)?
302 .filter_map(|result| match result {
303 Ok((Some(item1), _)) => {
304 if let Data::Delta(delta) = item1 {
305 Some(Ok(delta))
306 } else {
307 None
308 }
309 }
310 Ok((None, _)) => None,
311 Err(e) => Some(Err(e)),
312 })
313 .collect()
314 }
315
316 pub fn load_order_book_depth10(
317 &self,
318 filepath: &Path,
319 instrument_id: Option<InstrumentId>,
320 price_precision: Option<u8>,
321 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
322 self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false)?
323 .filter_map(|result| match result {
324 Ok((Some(item1), _)) => {
325 if let Data::Depth10(depth) = item1 {
326 Some(Ok(*depth))
327 } else {
328 None
329 }
330 }
331 Ok((None, _)) => None,
332 Err(e) => Some(Err(e)),
333 })
334 .collect()
335 }
336
337 pub fn load_quotes(
338 &self,
339 filepath: &Path,
340 instrument_id: Option<InstrumentId>,
341 price_precision: Option<u8>,
342 ) -> anyhow::Result<Vec<QuoteTick>> {
343 self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false)?
344 .filter_map(|result| match result {
345 Ok((Some(item1), _)) => {
346 if let Data::Quote(quote) = item1 {
347 Some(Ok(quote))
348 } else {
349 None
350 }
351 }
352 Ok((None, _)) => None,
353 Err(e) => Some(Err(e)),
354 })
355 .collect()
356 }
357
358 pub fn load_bbo_quotes(
359 &self,
360 filepath: &Path,
361 instrument_id: Option<InstrumentId>,
362 price_precision: Option<u8>,
363 ) -> anyhow::Result<Vec<QuoteTick>> {
364 self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false)?
365 .filter_map(|result| match result {
366 Ok((Some(item1), _)) => {
367 if let Data::Quote(quote) = item1 {
368 Some(Ok(quote))
369 } else {
370 None
371 }
372 }
373 Ok((None, _)) => None,
374 Err(e) => Some(Err(e)),
375 })
376 .collect()
377 }
378
379 pub fn load_tbbo_trades(
380 &self,
381 filepath: &Path,
382 instrument_id: Option<InstrumentId>,
383 price_precision: Option<u8>,
384 ) -> anyhow::Result<Vec<TradeTick>> {
385 self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false)?
386 .filter_map(|result| match result {
387 Ok((_, maybe_item2)) => {
388 if let Some(Data::Trade(trade)) = maybe_item2 {
389 Some(Ok(trade))
390 } else {
391 None
392 }
393 }
394 Err(e) => Some(Err(e)),
395 })
396 .collect()
397 }
398
399 pub fn load_trades(
400 &self,
401 filepath: &Path,
402 instrument_id: Option<InstrumentId>,
403 price_precision: Option<u8>,
404 ) -> anyhow::Result<Vec<TradeTick>> {
405 self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false)?
406 .filter_map(|result| match result {
407 Ok((Some(item1), _)) => {
408 if let Data::Trade(trade) = item1 {
409 Some(Ok(trade))
410 } else {
411 None
412 }
413 }
414 Ok((None, _)) => None,
415 Err(e) => Some(Err(e)),
416 })
417 .collect()
418 }
419
420 pub fn load_bars(
421 &self,
422 filepath: &Path,
423 instrument_id: Option<InstrumentId>,
424 price_precision: Option<u8>,
425 ) -> anyhow::Result<Vec<Bar>> {
426 self.read_records::<dbn::OhlcvMsg>(filepath, instrument_id, price_precision, false)?
427 .filter_map(|result| match result {
428 Ok((Some(item1), _)) => {
429 if let Data::Bar(bar) = item1 {
430 Some(Ok(bar))
431 } else {
432 None
433 }
434 }
435 Ok((None, _)) => None,
436 Err(e) => Some(Err(e)),
437 })
438 .collect()
439 }
440
441 pub fn load_status_records<T>(
442 &self,
443 filepath: &Path,
444 instrument_id: Option<InstrumentId>,
445 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
446 where
447 T: dbn::Record + dbn::HasRType + 'static,
448 {
449 let decoder = Decoder::from_zstd_file(filepath)?;
450 let metadata = decoder.metadata().clone();
451 let mut metadata_cache = MetadataCache::new(metadata);
452 let mut dbn_stream = decoder.decode_stream::<T>();
453
454 Ok(std::iter::from_fn(move || {
455 if let Err(e) = dbn_stream.advance() {
456 return Some(Err(e.into()));
457 }
458 match dbn_stream.get() {
459 Some(rec) => {
460 let record = dbn::RecordRef::from(rec);
461 let instrument_id = match &instrument_id {
462 Some(id) => *id, None => decode_nautilus_instrument_id(
464 &record,
465 &mut metadata_cache,
466 &self.publisher_venue_map,
467 &self.symbol_venue_map,
468 )
469 .expect("Failed to decode record"),
470 };
471
472 let msg = record.get::<dbn::StatusMsg>().expect("Invalid `StatusMsg`");
473 match decode_status_msg(msg, instrument_id, msg.ts_recv.into()) {
474 Ok(data) => Some(Ok(data)),
475 Err(e) => Some(Err(e)),
476 }
477 }
478 None => None,
479 }
480 }))
481 }
482
483 pub fn read_imbalance_records<T>(
484 &self,
485 filepath: &Path,
486 instrument_id: Option<InstrumentId>,
487 price_precision: Option<u8>,
488 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
489 where
490 T: dbn::Record + dbn::HasRType + 'static,
491 {
492 let decoder = Decoder::from_zstd_file(filepath)?;
493 let metadata = decoder.metadata().clone();
494 let mut metadata_cache = MetadataCache::new(metadata);
495 let mut dbn_stream = decoder.decode_stream::<T>();
496
497 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
498
499 Ok(std::iter::from_fn(move || {
500 if let Err(e) = dbn_stream.advance() {
501 return Some(Err(e.into()));
502 }
503 match dbn_stream.get() {
504 Some(rec) => {
505 let record = dbn::RecordRef::from(rec);
506 let instrument_id = match &instrument_id {
507 Some(id) => *id, None => decode_nautilus_instrument_id(
509 &record,
510 &mut metadata_cache,
511 &self.publisher_venue_map,
512 &self.symbol_venue_map,
513 )
514 .expect("Failed to decode record"),
515 };
516
517 let msg = record
518 .get::<dbn::ImbalanceMsg>()
519 .expect("Invalid `ImbalanceMsg`");
520 match decode_imbalance_msg(
521 msg,
522 instrument_id,
523 price_precision,
524 msg.ts_recv.into(),
525 ) {
526 Ok(data) => Some(Ok(data)),
527 Err(e) => Some(Err(e)),
528 }
529 }
530 None => None,
531 }
532 }))
533 }
534
535 pub fn read_statistics_records<T>(
536 &self,
537 filepath: &Path,
538 instrument_id: Option<InstrumentId>,
539 price_precision: Option<u8>,
540 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
541 where
542 T: dbn::Record + dbn::HasRType + 'static,
543 {
544 let decoder = Decoder::from_zstd_file(filepath)?;
545 let metadata = decoder.metadata().clone();
546 let mut metadata_cache = MetadataCache::new(metadata);
547 let mut dbn_stream = decoder.decode_stream::<T>();
548
549 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
550
551 Ok(std::iter::from_fn(move || {
552 if let Err(e) = dbn_stream.advance() {
553 return Some(Err(e.into()));
554 }
555 match dbn_stream.get() {
556 Some(rec) => {
557 let record = dbn::RecordRef::from(rec);
558 let instrument_id = match &instrument_id {
559 Some(id) => *id, None => decode_nautilus_instrument_id(
561 &record,
562 &mut metadata_cache,
563 &self.publisher_venue_map,
564 &self.symbol_venue_map,
565 )
566 .expect("Failed to decode record"),
567 };
568
569 let msg = record.get::<dbn::StatMsg>().expect("Invalid `StatMsg`");
570 match decode_statistics_msg(
571 msg,
572 instrument_id,
573 price_precision,
574 msg.ts_recv.into(),
575 ) {
576 Ok(data) => Some(Ok(data)),
577 Err(e) => Some(Err(e)),
578 }
579 }
580 None => None,
581 }
582 }))
583 }
584}
585
586#[cfg(test)]
590mod tests {
591 use std::path::{Path, PathBuf};
592
593 use rstest::{fixture, rstest};
594 use ustr::Ustr;
595
596 use super::*;
597
598 fn test_data_path() -> PathBuf {
599 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
600 }
601
602 #[fixture]
603 fn loader() -> DatabentoDataLoader {
604 let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
605 DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
606 }
607
608 #[rstest]
611 fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
612 let dataset = Ustr::from("EQUS.PLUS");
613 let venue = Venue::from("XNAS");
614 loader.set_dataset_for_venue(dataset, venue);
615
616 let result = loader.get_dataset_for_venue(&venue).unwrap();
617 assert_eq!(*result, dataset);
618 }
619
620 #[rstest]
621 #[case(test_data_path().join("test_data.definition.v1.dbn.zst"))]
623 fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
624 let instruments = loader.load_instruments(&path, false).unwrap();
625
626 assert_eq!(instruments.len(), 2);
627 }
628
629 #[rstest]
630 fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
631 let path = test_data_path().join("test_data.mbo.dbn.zst");
632 let instrument_id = InstrumentId::from("ESM4.GLBX");
633
634 let deltas = loader
635 .load_order_book_deltas(&path, Some(instrument_id), None)
636 .unwrap();
637
638 assert_eq!(deltas.len(), 2);
639 }
640
641 #[rstest]
642 fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
643 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
644 let instrument_id = InstrumentId::from("ESM4.GLBX");
645
646 let depths = loader
647 .load_order_book_depth10(&path, Some(instrument_id), None)
648 .unwrap();
649
650 assert_eq!(depths.len(), 2);
651 }
652
653 #[rstest]
654 fn test_load_quotes(loader: DatabentoDataLoader) {
655 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
656 let instrument_id = InstrumentId::from("ESM4.GLBX");
657
658 let quotes = loader
659 .load_quotes(&path, Some(instrument_id), None)
660 .unwrap();
661
662 assert_eq!(quotes.len(), 2);
663 }
664
665 #[rstest]
666 #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
667 #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
668 fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
669 let instrument_id = InstrumentId::from("ESM4.GLBX");
670
671 let quotes = loader
672 .load_bbo_quotes(&path, Some(instrument_id), None)
673 .unwrap();
674
675 assert_eq!(quotes.len(), 2);
676 }
677
678 #[rstest]
679 fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
680 let path = test_data_path().join("test_data.tbbo.dbn.zst");
681 let instrument_id = InstrumentId::from("ESM4.GLBX");
682
683 let _trades = loader
684 .load_tbbo_trades(&path, Some(instrument_id), None)
685 .unwrap();
686
687 }
689
690 #[rstest]
691 fn test_load_trades(loader: DatabentoDataLoader) {
692 let path = test_data_path().join("test_data.trades.dbn.zst");
693 let instrument_id = InstrumentId::from("ESM4.GLBX");
694 let trades = loader
695 .load_trades(&path, Some(instrument_id), None)
696 .unwrap();
697
698 assert_eq!(trades.len(), 2);
699 }
700
701 #[rstest]
702 #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
704 #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
705 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
706 fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
707 let instrument_id = InstrumentId::from("ESM4.GLBX");
708 let bars = loader.load_bars(&path, Some(instrument_id), None).unwrap();
709
710 assert_eq!(bars.len(), 2);
711 }
712}