1use std::{
19 fs,
20 num::NonZeroU64,
21 path::PathBuf,
22 str::FromStr,
23 sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28 dbn::{self, decode::DbnMetadata},
29 historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34 data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick},
35 enums::BarAggregation,
36 identifiers::{InstrumentId, Symbol, Venue},
37 instruments::InstrumentAny,
38 types::Currency,
39};
40
41use crate::{
42 common::get_date_time_range,
43 decode::{
44 decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record,
45 decode_statistics_msg, decode_status_msg,
46 },
47 symbology::{
48 MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
49 infer_symbology_type, instrument_id_to_symbol_string,
50 },
51 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
52};
53
54#[derive(Debug, Clone)]
59pub struct DatabentoHistoricalClient {
60 pub key: String,
61 clock: &'static AtomicTime,
62 inner: Arc<tokio::sync::Mutex<databento::HistoricalClient>>,
63 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
64 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
65 use_exchange_as_venue: bool,
66}
67
68#[derive(Debug)]
70pub struct RangeQueryParams {
71 pub dataset: String,
72 pub symbols: Vec<String>,
73 pub start: UnixNanos,
74 pub end: Option<UnixNanos>,
75 pub limit: Option<u64>,
76 pub price_precision: Option<u8>,
77}
78
79#[derive(Debug, Clone)]
81pub struct DatasetRange {
82 pub start: String,
83 pub end: String,
84}
85
86impl DatabentoHistoricalClient {
87 pub fn new(
93 key: String,
94 publishers_filepath: PathBuf,
95 clock: &'static AtomicTime,
96 use_exchange_as_venue: bool,
97 ) -> anyhow::Result<Self> {
98 let client = databento::HistoricalClient::builder()
99 .user_agent_extension(NAUTILUS_USER_AGENT.into())
100 .key(key.clone())
101 .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
102 .build()
103 .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
104
105 let file_content = fs::read_to_string(publishers_filepath)?;
106 let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
107
108 let publisher_venue_map = publishers_vec
109 .into_iter()
110 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
111 .collect::<IndexMap<u16, Venue>>();
112
113 Ok(Self {
114 clock,
115 inner: Arc::new(tokio::sync::Mutex::new(client)),
116 publisher_venue_map: Arc::new(publisher_venue_map),
117 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
118 key,
119 use_exchange_as_venue,
120 })
121 }
122
123 pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
129 let mut client = self.inner.lock().await;
130 let response = client
131 .metadata()
132 .get_dataset_range(dataset)
133 .await
134 .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
135
136 Ok(DatasetRange {
137 start: response.start.to_string(),
138 end: response.end.to_string(),
139 })
140 }
141
142 pub async fn get_range_instruments(
148 &self,
149 params: RangeQueryParams,
150 ) -> anyhow::Result<Vec<InstrumentAny>> {
151 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
152 check_consistent_symbology(&symbols)?;
153
154 let first_symbol = params
155 .symbols
156 .first()
157 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
158 let stype_in = infer_symbology_type(first_symbol);
159 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
160 let time_range = get_date_time_range(params.start, end)?;
161
162 let range_params = GetRangeParams::builder()
163 .dataset(params.dataset)
164 .date_time_range(time_range)
165 .symbols(symbols)
166 .stype_in(stype_in)
167 .schema(dbn::Schema::Definition)
168 .limit(params.limit.and_then(NonZeroU64::new))
169 .build();
170
171 let mut client = self.inner.lock().await;
172 let mut decoder = client
173 .timeseries()
174 .get_range(&range_params)
175 .await
176 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
177
178 let metadata = decoder.metadata().clone();
179 let mut metadata_cache = MetadataCache::new(metadata);
180 let mut instruments = Vec::new();
181
182 while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
183 let record = dbn::RecordRef::from(msg);
184 let sym_map = self
185 .symbol_venue_map
186 .read()
187 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
188 let mut instrument_id = decode_nautilus_instrument_id(
189 &record,
190 &mut metadata_cache,
191 &self.publisher_venue_map,
192 &sym_map,
193 )?;
194
195 if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
196 let exchange = msg
197 .exchange()
198 .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
199 let venue = Venue::from_code(exchange)
200 .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
201 instrument_id.venue = venue;
202 }
203
204 match decode_instrument_def_msg(msg, instrument_id, None) {
205 Ok(instrument) => instruments.push(instrument),
206 Err(e) => tracing::error!("Failed to decode instrument: {e:?}"),
207 }
208 }
209
210 Ok(instruments)
211 }
212
213 pub async fn get_range_quotes(
219 &self,
220 params: RangeQueryParams,
221 schema: Option<String>,
222 ) -> anyhow::Result<Vec<QuoteTick>> {
223 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
224 check_consistent_symbology(&symbols)?;
225
226 let first_symbol = params
227 .symbols
228 .first()
229 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
230 let stype_in = infer_symbology_type(first_symbol);
231 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
232 let time_range = get_date_time_range(params.start, end)?;
233 let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
234 let dbn_schema = dbn::Schema::from_str(&schema)?;
235
236 match dbn_schema {
237 dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
238 _ => anyhow::bail!("Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m"),
239 }
240
241 let range_params = GetRangeParams::builder()
242 .dataset(params.dataset)
243 .date_time_range(time_range)
244 .symbols(symbols)
245 .stype_in(stype_in)
246 .schema(dbn_schema)
247 .limit(params.limit.and_then(NonZeroU64::new))
248 .build();
249
250 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
251
252 let mut client = self.inner.lock().await;
253 let mut decoder = client
254 .timeseries()
255 .get_range(&range_params)
256 .await
257 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
258
259 let metadata = decoder.metadata().clone();
260 let mut metadata_cache = MetadataCache::new(metadata);
261 let mut result: Vec<QuoteTick> = Vec::new();
262
263 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
264 let sym_map = self
265 .symbol_venue_map
266 .read()
267 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
268 let instrument_id = decode_nautilus_instrument_id(
269 &record,
270 &mut metadata_cache,
271 &self.publisher_venue_map,
272 &sym_map,
273 )?;
274
275 let (data, _) = decode_record(
276 &record,
277 instrument_id,
278 price_precision,
279 None,
280 false, true,
282 )?;
283
284 match data {
285 Some(Data::Quote(quote)) => result.push(quote),
286 None => {} _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
288 }
289 Ok(())
290 };
291
292 match dbn_schema {
293 dbn::Schema::Mbp1 => {
294 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
295 process_record(dbn::RecordRef::from(msg))?;
296 }
297 }
298 dbn::Schema::Bbo1M => {
299 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
300 process_record(dbn::RecordRef::from(msg))?;
301 }
302 }
303 dbn::Schema::Bbo1S => {
304 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
305 process_record(dbn::RecordRef::from(msg))?;
306 }
307 }
308 _ => anyhow::bail!("Invalid schema {dbn_schema}"),
309 }
310
311 Ok(result)
312 }
313
314 pub async fn get_range_order_book_depth10(
320 &self,
321 params: RangeQueryParams,
322 depth: Option<usize>,
323 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
324 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
325 check_consistent_symbology(&symbols)?;
326
327 let first_symbol = params
328 .symbols
329 .first()
330 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
331 let stype_in = infer_symbology_type(first_symbol);
332 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
333 let time_range = get_date_time_range(params.start, end)?;
334
335 let _depth = depth.unwrap_or(10);
337 if _depth != 10 {
338 anyhow::bail!("Only depth=10 is currently supported for order book depths");
339 }
340
341 let range_params = GetRangeParams::builder()
342 .dataset(params.dataset)
343 .date_time_range(time_range)
344 .symbols(symbols)
345 .stype_in(stype_in)
346 .schema(dbn::Schema::Mbp10)
347 .limit(params.limit.and_then(NonZeroU64::new))
348 .build();
349
350 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
351
352 let mut client = self.inner.lock().await;
353 let mut decoder = client
354 .timeseries()
355 .get_range(&range_params)
356 .await
357 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
358
359 let metadata = decoder.metadata().clone();
360 let mut metadata_cache = MetadataCache::new(metadata);
361 let mut result: Vec<OrderBookDepth10> = Vec::new();
362
363 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
364 let sym_map = self
365 .symbol_venue_map
366 .read()
367 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
368 let instrument_id = decode_nautilus_instrument_id(
369 &record,
370 &mut metadata_cache,
371 &self.publisher_venue_map,
372 &sym_map,
373 )?;
374
375 if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
376 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
377 result.push(depth);
378 }
379
380 Ok(())
381 };
382
383 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
384 process_record(dbn::RecordRef::from(msg))?;
385 }
386
387 Ok(result)
388 }
389
390 pub async fn get_range_trades(
396 &self,
397 params: RangeQueryParams,
398 ) -> anyhow::Result<Vec<TradeTick>> {
399 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
400 check_consistent_symbology(&symbols)?;
401
402 let first_symbol = params
403 .symbols
404 .first()
405 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
406 let stype_in = infer_symbology_type(first_symbol);
407 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
408 let time_range = get_date_time_range(params.start, end)?;
409
410 let range_params = GetRangeParams::builder()
411 .dataset(params.dataset)
412 .date_time_range(time_range)
413 .symbols(symbols)
414 .stype_in(stype_in)
415 .schema(dbn::Schema::Trades)
416 .limit(params.limit.and_then(NonZeroU64::new))
417 .build();
418
419 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
420
421 let mut client = self.inner.lock().await;
422 let mut decoder = client
423 .timeseries()
424 .get_range(&range_params)
425 .await
426 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
427
428 let metadata = decoder.metadata().clone();
429 let mut metadata_cache = MetadataCache::new(metadata);
430 let mut result: Vec<TradeTick> = Vec::new();
431
432 while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
433 let record = dbn::RecordRef::from(msg);
434 let sym_map = self
435 .symbol_venue_map
436 .read()
437 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
438 let instrument_id = decode_nautilus_instrument_id(
439 &record,
440 &mut metadata_cache,
441 &self.publisher_venue_map,
442 &sym_map,
443 )?;
444
445 let (data, _) = decode_record(
446 &record,
447 instrument_id,
448 price_precision,
449 None,
450 false, true,
452 )?;
453
454 match data {
455 Some(Data::Trade(trade)) => {
456 result.push(trade);
457 }
458 _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
459 }
460 }
461
462 Ok(result)
463 }
464
465 pub async fn get_range_bars(
471 &self,
472 params: RangeQueryParams,
473 aggregation: BarAggregation,
474 timestamp_on_close: bool,
475 ) -> anyhow::Result<Vec<Bar>> {
476 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
477 check_consistent_symbology(&symbols)?;
478
479 let first_symbol = params
480 .symbols
481 .first()
482 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
483 let stype_in = infer_symbology_type(first_symbol);
484 let schema = match aggregation {
485 BarAggregation::Second => dbn::Schema::Ohlcv1S,
486 BarAggregation::Minute => dbn::Schema::Ohlcv1M,
487 BarAggregation::Hour => dbn::Schema::Ohlcv1H,
488 BarAggregation::Day => dbn::Schema::Ohlcv1D,
489 _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
490 };
491
492 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
493 let time_range = get_date_time_range(params.start, end)?;
494
495 let range_params = GetRangeParams::builder()
496 .dataset(params.dataset)
497 .date_time_range(time_range)
498 .symbols(symbols)
499 .stype_in(stype_in)
500 .schema(schema)
501 .limit(params.limit.and_then(NonZeroU64::new))
502 .build();
503
504 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
505
506 let mut client = self.inner.lock().await;
507 let mut decoder = client
508 .timeseries()
509 .get_range(&range_params)
510 .await
511 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
512
513 let metadata = decoder.metadata().clone();
514 let mut metadata_cache = MetadataCache::new(metadata);
515 let mut result: Vec<Bar> = Vec::new();
516
517 while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
518 let record = dbn::RecordRef::from(msg);
519 let sym_map = self
520 .symbol_venue_map
521 .read()
522 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
523 let instrument_id = decode_nautilus_instrument_id(
524 &record,
525 &mut metadata_cache,
526 &self.publisher_venue_map,
527 &sym_map,
528 )?;
529
530 let (data, _) = decode_record(
531 &record,
532 instrument_id,
533 price_precision,
534 None,
535 false, timestamp_on_close,
537 )?;
538
539 match data {
540 Some(Data::Bar(bar)) => {
541 result.push(bar);
542 }
543 _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
544 }
545 }
546
547 Ok(result)
548 }
549
550 pub async fn get_range_imbalance(
556 &self,
557 params: RangeQueryParams,
558 ) -> anyhow::Result<Vec<DatabentoImbalance>> {
559 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
560 check_consistent_symbology(&symbols)?;
561
562 let first_symbol = params
563 .symbols
564 .first()
565 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
566 let stype_in = infer_symbology_type(first_symbol);
567 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
568 let time_range = get_date_time_range(params.start, end)?;
569
570 let range_params = GetRangeParams::builder()
571 .dataset(params.dataset)
572 .date_time_range(time_range)
573 .symbols(symbols)
574 .stype_in(stype_in)
575 .schema(dbn::Schema::Imbalance)
576 .limit(params.limit.and_then(NonZeroU64::new))
577 .build();
578
579 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
580
581 let mut client = self.inner.lock().await;
582 let mut decoder = client
583 .timeseries()
584 .get_range(&range_params)
585 .await
586 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
587
588 let metadata = decoder.metadata().clone();
589 let mut metadata_cache = MetadataCache::new(metadata);
590 let mut result: Vec<DatabentoImbalance> = Vec::new();
591
592 while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
593 let record = dbn::RecordRef::from(msg);
594 let sym_map = self
595 .symbol_venue_map
596 .read()
597 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
598 let instrument_id = decode_nautilus_instrument_id(
599 &record,
600 &mut metadata_cache,
601 &self.publisher_venue_map,
602 &sym_map,
603 )?;
604
605 let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
606 result.push(imbalance);
607 }
608
609 Ok(result)
610 }
611
612 pub async fn get_range_statistics(
618 &self,
619 params: RangeQueryParams,
620 ) -> anyhow::Result<Vec<DatabentoStatistics>> {
621 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
622 check_consistent_symbology(&symbols)?;
623
624 let first_symbol = params
625 .symbols
626 .first()
627 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
628 let stype_in = infer_symbology_type(first_symbol);
629 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
630 let time_range = get_date_time_range(params.start, end)?;
631
632 let range_params = GetRangeParams::builder()
633 .dataset(params.dataset)
634 .date_time_range(time_range)
635 .symbols(symbols)
636 .stype_in(stype_in)
637 .schema(dbn::Schema::Statistics)
638 .limit(params.limit.and_then(NonZeroU64::new))
639 .build();
640
641 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
642
643 let mut client = self.inner.lock().await;
644 let mut decoder = client
645 .timeseries()
646 .get_range(&range_params)
647 .await
648 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
649
650 let metadata = decoder.metadata().clone();
651 let mut metadata_cache = MetadataCache::new(metadata);
652 let mut result: Vec<DatabentoStatistics> = Vec::new();
653
654 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
655 let record = dbn::RecordRef::from(msg);
656 let sym_map = self
657 .symbol_venue_map
658 .read()
659 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
660 let instrument_id = decode_nautilus_instrument_id(
661 &record,
662 &mut metadata_cache,
663 &self.publisher_venue_map,
664 &sym_map,
665 )?;
666
667 let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
668 result.push(statistics);
669 }
670
671 Ok(result)
672 }
673
674 pub async fn get_range_status(
680 &self,
681 params: RangeQueryParams,
682 ) -> anyhow::Result<Vec<InstrumentStatus>> {
683 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
684 check_consistent_symbology(&symbols)?;
685
686 let first_symbol = params
687 .symbols
688 .first()
689 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
690 let stype_in = infer_symbology_type(first_symbol);
691 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
692 let time_range = get_date_time_range(params.start, end)?;
693
694 let range_params = GetRangeParams::builder()
695 .dataset(params.dataset)
696 .date_time_range(time_range)
697 .symbols(symbols)
698 .stype_in(stype_in)
699 .schema(dbn::Schema::Status)
700 .limit(params.limit.and_then(NonZeroU64::new))
701 .build();
702
703 let mut client = self.inner.lock().await;
704 let mut decoder = client
705 .timeseries()
706 .get_range(&range_params)
707 .await
708 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
709
710 let metadata = decoder.metadata().clone();
711 let mut metadata_cache = MetadataCache::new(metadata);
712 let mut result: Vec<InstrumentStatus> = Vec::new();
713
714 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
715 let record = dbn::RecordRef::from(msg);
716 let sym_map = self
717 .symbol_venue_map
718 .read()
719 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
720 let instrument_id = decode_nautilus_instrument_id(
721 &record,
722 &mut metadata_cache,
723 &self.publisher_venue_map,
724 &sym_map,
725 )?;
726
727 let status = decode_status_msg(msg, instrument_id, None)?;
728 result.push(status);
729 }
730
731 Ok(result)
732 }
733
734 pub fn prepare_symbols_from_instrument_ids(
740 &self,
741 instrument_ids: &[InstrumentId],
742 ) -> anyhow::Result<Vec<String>> {
743 let mut symbol_venue_map = self
744 .symbol_venue_map
745 .write()
746 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
747
748 let symbols: Vec<String> = instrument_ids
749 .iter()
750 .map(|instrument_id| {
751 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
752 })
753 .collect();
754
755 Ok(symbols)
756 }
757}