1use std::{
17 collections::HashMap,
18 env, fs,
19 path::{Path, PathBuf},
20};
21
22use databento::dbn;
23use dbn::{
24 Publisher,
25 compat::InstrumentDefMsgV1,
26 decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32 identifiers::{InstrumentId, Symbol, Venue},
33 instruments::InstrumentAny,
34 types::Currency,
35};
36use ustr::Ustr;
37
38use super::{
39 decode::{
40 decode_imbalance_msg, decode_instrument_def_msg_v1, decode_record, decode_statistics_msg,
41 decode_status_msg,
42 },
43 symbology::decode_nautilus_instrument_id,
44 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
45};
46
47#[cfg_attr(
70 feature = "python",
71 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
72)]
73#[derive(Debug)]
74pub struct DatabentoDataLoader {
75 publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
76 venue_dataset_map: IndexMap<Venue, Dataset>,
77 publisher_venue_map: IndexMap<PublisherId, Venue>,
78 symbol_venue_map: HashMap<Symbol, Venue>,
79}
80
81impl DatabentoDataLoader {
82 pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
84 let mut loader = Self {
85 publishers_map: IndexMap::new(),
86 venue_dataset_map: IndexMap::new(),
87 publisher_venue_map: IndexMap::new(),
88 symbol_venue_map: HashMap::new(),
89 };
90
91 let publishers_filepath = if let Some(p) = publishers_filepath {
93 p
94 } else {
95 let mut exe_path = env::current_exe()?;
97 exe_path.pop();
98 exe_path.push("publishers.json");
99 exe_path
100 };
101
102 loader
103 .load_publishers(publishers_filepath)
104 .unwrap_or_else(|e| panic!("Error loading publishers.json: {e}",));
105
106 Ok(loader)
107 }
108
109 pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
111 let file_content = fs::read_to_string(filepath)?;
112 let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
113
114 self.publishers_map = publishers
115 .clone()
116 .into_iter()
117 .map(|p| (p.publisher_id, p))
118 .collect::<IndexMap<u16, DatabentoPublisher>>();
119
120 self.venue_dataset_map = publishers
121 .iter()
122 .map(|p| {
123 (
124 Venue::from(p.venue.as_str()),
125 Dataset::from(p.dataset.as_str()),
126 )
127 })
128 .collect::<IndexMap<Venue, Ustr>>();
129
130 let glbx = Dataset::from("GLBX.MDP3");
132 self.venue_dataset_map.insert(Venue::CBCM(), glbx);
133 self.venue_dataset_map.insert(Venue::GLBX(), glbx);
134 self.venue_dataset_map.insert(Venue::NYUM(), glbx);
135 self.venue_dataset_map.insert(Venue::XCBT(), glbx);
136 self.venue_dataset_map.insert(Venue::XCEC(), glbx);
137 self.venue_dataset_map.insert(Venue::XCME(), glbx);
138 self.venue_dataset_map.insert(Venue::XFXS(), glbx);
139 self.venue_dataset_map.insert(Venue::XNYM(), glbx);
140
141 self.publisher_venue_map = publishers
142 .into_iter()
143 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
144 .collect::<IndexMap<u16, Venue>>();
145
146 Ok(())
147 }
148
149 #[must_use]
151 pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
152 &self.publishers_map
153 }
154
155 #[must_use]
157 pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
158 self.venue_dataset_map.get(venue)
159 }
160
161 #[must_use]
163 pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
164 self.publisher_venue_map.get(&publisher_id)
165 }
166
167 pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
168 let decoder = Decoder::from_zstd_file(filepath)?;
169 let metadata = decoder.metadata();
170 Ok(metadata.schema.map(|schema| schema.to_string()))
171 }
172
173 pub fn read_definition_records(
174 &mut self,
175 filepath: &Path,
176 use_exchange_as_venue: bool,
177 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
178 let mut decoder = Decoder::from_zstd_file(filepath)?;
179
180 let upgrade_policy = dbn::VersionUpgradePolicy::AsIs;
183 decoder.set_upgrade_policy(upgrade_policy);
184
185 let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsgV1>();
186
187 Ok(std::iter::from_fn(move || {
188 if let Err(e) = dbn_stream.advance() {
189 return Some(Err(e.into()));
190 }
191 match dbn_stream.get() {
192 Some(rec) => {
193 let record = dbn::RecordRef::from(rec);
194 let msg = record.get::<InstrumentDefMsgV1>().unwrap();
195
196 let raw_symbol = rec.raw_symbol().expect("Error decoding `raw_symbol`");
197 let symbol = Symbol::from(raw_symbol);
198
199 let publisher = rec.hd.publisher().expect("Invalid `publisher` for record");
200 let venue = match publisher {
201 Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
202 let exchange = rec.exchange().unwrap();
204 let venue = Venue::from_code(exchange).unwrap_or_else(|_| {
205 panic!("`Venue` not found for exchange {exchange}")
206 });
207 self.symbol_venue_map.insert(symbol, venue);
208 venue
209 }
210 _ => *self
211 .publisher_venue_map
212 .get(&msg.hd.publisher_id)
213 .expect("`Venue` not found `publisher_id`"),
214 };
215 let instrument_id = InstrumentId::new(symbol, venue);
216
217 match decode_instrument_def_msg_v1(rec, instrument_id, msg.ts_recv.into()) {
218 Ok(data) => Some(Ok(data)),
219 Err(e) => Some(Err(e)),
220 }
221 }
222 None => None,
223 }
224 }))
225 }
226
227 pub fn read_records<T>(
228 &self,
229 filepath: &Path,
230 instrument_id: Option<InstrumentId>,
231 price_precision: Option<u8>,
232 include_trades: bool,
233 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
234 where
235 T: dbn::Record + dbn::HasRType + 'static,
236 {
237 let decoder = Decoder::from_zstd_file(filepath)?;
238 let metadata = decoder.metadata().clone();
239 let mut dbn_stream = decoder.decode_stream::<T>();
240
241 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
242
243 Ok(std::iter::from_fn(move || {
244 if let Err(e) = dbn_stream.advance() {
245 return Some(Err(e.into()));
246 }
247 match dbn_stream.get() {
248 Some(rec) => {
249 let record = dbn::RecordRef::from(rec);
250 let instrument_id = match &instrument_id {
251 Some(id) => *id, None => decode_nautilus_instrument_id(
253 &record,
254 &metadata,
255 &self.publisher_venue_map,
256 &self.symbol_venue_map,
257 )
258 .expect("Failed to decode record"),
259 };
260
261 match decode_record(
262 &record,
263 instrument_id,
264 price_precision,
265 None,
266 include_trades,
267 ) {
268 Ok(data) => Some(Ok(data)),
269 Err(e) => Some(Err(e)),
270 }
271 }
272 None => None,
273 }
274 }))
275 }
276
277 pub fn load_instruments(
278 &mut self,
279 filepath: &Path,
280 use_exchange_as_venue: bool,
281 ) -> anyhow::Result<Vec<InstrumentAny>> {
282 self.read_definition_records(filepath, use_exchange_as_venue)?
283 .collect::<Result<Vec<_>, _>>()
284 }
285
286 pub fn load_order_book_deltas(
288 &self,
289 filepath: &Path,
290 instrument_id: Option<InstrumentId>,
291 price_precision: Option<u8>,
292 ) -> anyhow::Result<Vec<OrderBookDelta>> {
293 self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false)?
294 .filter_map(|result| match result {
295 Ok((Some(item1), _)) => {
296 if let Data::Delta(delta) = item1 {
297 Some(Ok(delta))
298 } else {
299 None
300 }
301 }
302 Ok((None, _)) => None,
303 Err(e) => Some(Err(e)),
304 })
305 .collect()
306 }
307
308 pub fn load_order_book_depth10(
309 &self,
310 filepath: &Path,
311 instrument_id: Option<InstrumentId>,
312 price_precision: Option<u8>,
313 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
314 self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false)?
315 .filter_map(|result| match result {
316 Ok((Some(item1), _)) => {
317 if let Data::Depth10(depth) = item1 {
318 Some(Ok(*depth))
319 } else {
320 None
321 }
322 }
323 Ok((None, _)) => None,
324 Err(e) => Some(Err(e)),
325 })
326 .collect()
327 }
328
329 pub fn load_quotes(
330 &self,
331 filepath: &Path,
332 instrument_id: Option<InstrumentId>,
333 price_precision: Option<u8>,
334 ) -> anyhow::Result<Vec<QuoteTick>> {
335 self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false)?
336 .filter_map(|result| match result {
337 Ok((Some(item1), _)) => {
338 if let Data::Quote(quote) = item1 {
339 Some(Ok(quote))
340 } else {
341 None
342 }
343 }
344 Ok((None, _)) => None,
345 Err(e) => Some(Err(e)),
346 })
347 .collect()
348 }
349
350 pub fn load_bbo_quotes(
351 &self,
352 filepath: &Path,
353 instrument_id: Option<InstrumentId>,
354 price_precision: Option<u8>,
355 ) -> anyhow::Result<Vec<QuoteTick>> {
356 self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false)?
357 .filter_map(|result| match result {
358 Ok((Some(item1), _)) => {
359 if let Data::Quote(quote) = item1 {
360 Some(Ok(quote))
361 } else {
362 None
363 }
364 }
365 Ok((None, _)) => None,
366 Err(e) => Some(Err(e)),
367 })
368 .collect()
369 }
370
371 pub fn load_tbbo_trades(
372 &self,
373 filepath: &Path,
374 instrument_id: Option<InstrumentId>,
375 price_precision: Option<u8>,
376 ) -> anyhow::Result<Vec<TradeTick>> {
377 self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false)?
378 .filter_map(|result| match result {
379 Ok((_, maybe_item2)) => {
380 if let Some(Data::Trade(trade)) = maybe_item2 {
381 Some(Ok(trade))
382 } else {
383 None
384 }
385 }
386 Err(e) => Some(Err(e)),
387 })
388 .collect()
389 }
390
391 pub fn load_trades(
392 &self,
393 filepath: &Path,
394 instrument_id: Option<InstrumentId>,
395 price_precision: Option<u8>,
396 ) -> anyhow::Result<Vec<TradeTick>> {
397 self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false)?
398 .filter_map(|result| match result {
399 Ok((Some(item1), _)) => {
400 if let Data::Trade(trade) = item1 {
401 Some(Ok(trade))
402 } else {
403 None
404 }
405 }
406 Ok((None, _)) => None,
407 Err(e) => Some(Err(e)),
408 })
409 .collect()
410 }
411
412 pub fn load_bars(
413 &self,
414 filepath: &Path,
415 instrument_id: Option<InstrumentId>,
416 price_precision: Option<u8>,
417 ) -> anyhow::Result<Vec<Bar>> {
418 self.read_records::<dbn::OhlcvMsg>(filepath, instrument_id, price_precision, false)?
419 .filter_map(|result| match result {
420 Ok((Some(item1), _)) => {
421 if let Data::Bar(bar) = item1 {
422 Some(Ok(bar))
423 } else {
424 None
425 }
426 }
427 Ok((None, _)) => None,
428 Err(e) => Some(Err(e)),
429 })
430 .collect()
431 }
432
433 pub fn load_status_records<T>(
434 &self,
435 filepath: &Path,
436 instrument_id: Option<InstrumentId>,
437 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
438 where
439 T: dbn::Record + dbn::HasRType + 'static,
440 {
441 let decoder = Decoder::from_zstd_file(filepath)?;
442 let metadata = decoder.metadata().clone();
443 let mut dbn_stream = decoder.decode_stream::<T>();
444
445 Ok(std::iter::from_fn(move || {
446 if let Err(e) = dbn_stream.advance() {
447 return Some(Err(e.into()));
448 }
449 match dbn_stream.get() {
450 Some(rec) => {
451 let record = dbn::RecordRef::from(rec);
452 let instrument_id = match &instrument_id {
453 Some(id) => *id, None => decode_nautilus_instrument_id(
455 &record,
456 &metadata,
457 &self.publisher_venue_map,
458 &self.symbol_venue_map,
459 )
460 .expect("Failed to decode record"),
461 };
462
463 let msg = record.get::<dbn::StatusMsg>().expect("Invalid `StatusMsg`");
464 match decode_status_msg(msg, instrument_id, msg.ts_recv.into()) {
465 Ok(data) => Some(Ok(data)),
466 Err(e) => Some(Err(e)),
467 }
468 }
469 None => None,
470 }
471 }))
472 }
473
474 pub fn read_imbalance_records<T>(
475 &self,
476 filepath: &Path,
477 instrument_id: Option<InstrumentId>,
478 price_precision: Option<u8>,
479 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
480 where
481 T: dbn::Record + dbn::HasRType + 'static,
482 {
483 let decoder = Decoder::from_zstd_file(filepath)?;
484 let metadata = decoder.metadata().clone();
485 let mut dbn_stream = decoder.decode_stream::<T>();
486
487 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
488
489 Ok(std::iter::from_fn(move || {
490 if let Err(e) = dbn_stream.advance() {
491 return Some(Err(e.into()));
492 }
493 match dbn_stream.get() {
494 Some(rec) => {
495 let record = dbn::RecordRef::from(rec);
496 let instrument_id = match &instrument_id {
497 Some(id) => *id, None => decode_nautilus_instrument_id(
499 &record,
500 &metadata,
501 &self.publisher_venue_map,
502 &self.symbol_venue_map,
503 )
504 .expect("Failed to decode record"),
505 };
506
507 let msg = record
508 .get::<dbn::ImbalanceMsg>()
509 .expect("Invalid `ImbalanceMsg`");
510 match decode_imbalance_msg(
511 msg,
512 instrument_id,
513 price_precision,
514 msg.ts_recv.into(),
515 ) {
516 Ok(data) => Some(Ok(data)),
517 Err(e) => Some(Err(e)),
518 }
519 }
520 None => None,
521 }
522 }))
523 }
524
525 pub fn read_statistics_records<T>(
526 &self,
527 filepath: &Path,
528 instrument_id: Option<InstrumentId>,
529 price_precision: Option<u8>,
530 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
531 where
532 T: dbn::Record + dbn::HasRType + 'static,
533 {
534 let decoder = Decoder::from_zstd_file(filepath)?;
535 let metadata = decoder.metadata().clone();
536 let mut dbn_stream = decoder.decode_stream::<T>();
537
538 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
539
540 Ok(std::iter::from_fn(move || {
541 if let Err(e) = dbn_stream.advance() {
542 return Some(Err(e.into()));
543 }
544 match dbn_stream.get() {
545 Some(rec) => {
546 let record = dbn::RecordRef::from(rec);
547 let instrument_id = match &instrument_id {
548 Some(id) => *id, None => decode_nautilus_instrument_id(
550 &record,
551 &metadata,
552 &self.publisher_venue_map,
553 &self.symbol_venue_map,
554 )
555 .expect("Failed to decode record"),
556 };
557
558 let msg = record.get::<dbn::StatMsg>().expect("Invalid `StatMsg`");
559 match decode_statistics_msg(
560 msg,
561 instrument_id,
562 price_precision,
563 msg.ts_recv.into(),
564 ) {
565 Ok(data) => Some(Ok(data)),
566 Err(e) => Some(Err(e)),
567 }
568 }
569 None => None,
570 }
571 }))
572 }
573}
574
575#[cfg(test)]
579mod tests {
580 use std::path::{Path, PathBuf};
581
582 use rstest::*;
583
584 use super::*;
585
586 fn test_data_path() -> PathBuf {
587 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
588 }
589
590 fn data_loader() -> DatabentoDataLoader {
591 let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
592 DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
593 }
594
595 #[rstest]
598 #[case(test_data_path().join("test_data.definition.v1.dbn.zst"))]
600 fn test_load_instruments(#[case] path: PathBuf) {
601 let mut loader = data_loader();
602 let instruments = loader.load_instruments(&path, false).unwrap();
603
604 assert_eq!(instruments.len(), 2);
605 }
606
607 #[rstest]
608 fn test_load_order_book_deltas() {
609 let path = test_data_path().join("test_data.mbo.dbn.zst");
610 let loader = data_loader();
611 let instrument_id = InstrumentId::from("ESM4.GLBX");
612
613 let deltas = loader
614 .load_order_book_deltas(&path, Some(instrument_id), None)
615 .unwrap();
616
617 assert_eq!(deltas.len(), 2);
618 }
619
620 #[rstest]
621 fn test_load_order_book_depth10() {
622 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
623 let loader = data_loader();
624 let instrument_id = InstrumentId::from("ESM4.GLBX");
625
626 let depths = loader
627 .load_order_book_depth10(&path, Some(instrument_id), None)
628 .unwrap();
629
630 assert_eq!(depths.len(), 2);
631 }
632
633 #[rstest]
634 fn test_load_quotes() {
635 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
636 let loader = data_loader();
637 let instrument_id = InstrumentId::from("ESM4.GLBX");
638
639 let quotes = loader
640 .load_quotes(&path, Some(instrument_id), None)
641 .unwrap();
642
643 assert_eq!(quotes.len(), 2);
644 }
645
646 #[rstest]
647 #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
648 #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
649 fn test_load_bbo_quotes(#[case] path: PathBuf) {
650 let loader = data_loader();
651 let instrument_id = InstrumentId::from("ESM4.GLBX");
652
653 let quotes = loader
654 .load_bbo_quotes(&path, Some(instrument_id), None)
655 .unwrap();
656
657 assert_eq!(quotes.len(), 2);
658 }
659
660 #[rstest]
661 fn test_load_tbbo_trades() {
662 let path = test_data_path().join("test_data.tbbo.dbn.zst");
663 let loader = data_loader();
664 let instrument_id = InstrumentId::from("ESM4.GLBX");
665
666 let _trades = loader
667 .load_tbbo_trades(&path, Some(instrument_id), None)
668 .unwrap();
669
670 }
672
673 #[rstest]
674 fn test_load_trades() {
675 let path = test_data_path().join("test_data.trades.dbn.zst");
676 let loader = data_loader();
677
678 let instrument_id = InstrumentId::from("ESM4.GLBX");
679 let trades = loader
680 .load_trades(&path, Some(instrument_id), None)
681 .unwrap();
682
683 assert_eq!(trades.len(), 2);
684 }
685
686 #[rstest]
687 #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
689 #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
690 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
691 fn test_load_bars(#[case] path: PathBuf) {
692 let loader = data_loader();
693
694 let instrument_id = InstrumentId::from("ESM4.GLBX");
695 let bars = loader.load_bars(&path, Some(instrument_id), None).unwrap();
696
697 assert_eq!(bars.len(), 2);
698 }
699}