1use std::{
19 fs,
20 num::NonZeroU64,
21 path::PathBuf,
22 str::FromStr,
23 sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28 dbn::{self, decode::DbnMetadata},
29 historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34 data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick},
35 enums::BarAggregation,
36 identifiers::{InstrumentId, Symbol, Venue},
37 instruments::InstrumentAny,
38 types::Currency,
39};
40use tokio::sync::Mutex;
41
42use crate::{
43 common::get_date_time_range,
44 decode::{
45 decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record,
46 decode_statistics_msg, decode_status_msg,
47 },
48 symbology::{
49 MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
50 infer_symbology_type, instrument_id_to_symbol_string,
51 },
52 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
53};
54
55#[derive(Debug, Clone)]
60pub struct DatabentoHistoricalClient {
61 pub key: String,
62 clock: &'static AtomicTime,
63 inner: Arc<Mutex<databento::HistoricalClient>>,
64 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
65 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
66 use_exchange_as_venue: bool,
67}
68
69#[derive(Debug)]
71pub struct RangeQueryParams {
72 pub dataset: String,
73 pub symbols: Vec<String>,
74 pub start: UnixNanos,
75 pub end: Option<UnixNanos>,
76 pub limit: Option<u64>,
77 pub price_precision: Option<u8>,
78}
79
80#[derive(Debug, Clone)]
82pub struct DatasetRange {
83 pub start: String,
84 pub end: String,
85}
86
87impl DatabentoHistoricalClient {
88 pub fn new(
94 key: String,
95 publishers_filepath: PathBuf,
96 clock: &'static AtomicTime,
97 use_exchange_as_venue: bool,
98 ) -> anyhow::Result<Self> {
99 let client = databento::HistoricalClient::builder()
100 .user_agent_extension(NAUTILUS_USER_AGENT.into())
101 .key(key.clone())
102 .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
103 .build()
104 .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
105
106 let file_content = fs::read_to_string(publishers_filepath)?;
107 let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
108
109 let publisher_venue_map = publishers_vec
110 .into_iter()
111 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
112 .collect::<IndexMap<u16, Venue>>();
113
114 Ok(Self {
115 clock,
116 inner: Arc::new(Mutex::new(client)),
117 publisher_venue_map: Arc::new(publisher_venue_map),
118 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
119 key,
120 use_exchange_as_venue,
121 })
122 }
123
124 pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
130 let mut client = self.inner.lock().await;
131 let response = client
132 .metadata()
133 .get_dataset_range(dataset)
134 .await
135 .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
136
137 Ok(DatasetRange {
138 start: response.start.to_string(),
139 end: response.end.to_string(),
140 })
141 }
142
143 pub async fn get_range_instruments(
149 &self,
150 params: RangeQueryParams,
151 ) -> anyhow::Result<Vec<InstrumentAny>> {
152 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
153 check_consistent_symbology(&symbols)?;
154
155 let first_symbol = params
156 .symbols
157 .first()
158 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
159 let stype_in = infer_symbology_type(first_symbol);
160 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
161 let time_range = get_date_time_range(params.start, end)?;
162
163 let range_params = GetRangeParams::builder()
164 .dataset(params.dataset)
165 .date_time_range(time_range)
166 .symbols(symbols)
167 .stype_in(stype_in)
168 .schema(dbn::Schema::Definition)
169 .limit(params.limit.and_then(NonZeroU64::new))
170 .build();
171
172 let mut client = self.inner.lock().await;
173 let mut decoder = client
174 .timeseries()
175 .get_range(&range_params)
176 .await
177 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
178
179 let metadata = decoder.metadata().clone();
180 let mut metadata_cache = MetadataCache::new(metadata);
181 let mut instruments = Vec::new();
182
183 while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
184 let record = dbn::RecordRef::from(msg);
185 let sym_map = self
186 .symbol_venue_map
187 .read()
188 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
189 let mut instrument_id = decode_nautilus_instrument_id(
190 &record,
191 &mut metadata_cache,
192 &self.publisher_venue_map,
193 &sym_map,
194 )?;
195
196 if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
197 let exchange = msg
198 .exchange()
199 .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
200 let venue = Venue::from_code(exchange)
201 .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
202 instrument_id.venue = venue;
203 }
204
205 match decode_instrument_def_msg(msg, instrument_id, None) {
206 Ok(instrument) => instruments.push(instrument),
207 Err(e) => tracing::error!("Failed to decode instrument: {e:?}"),
208 }
209 }
210
211 Ok(instruments)
212 }
213
214 pub async fn get_range_quotes(
220 &self,
221 params: RangeQueryParams,
222 schema: Option<String>,
223 ) -> anyhow::Result<Vec<QuoteTick>> {
224 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
225 check_consistent_symbology(&symbols)?;
226
227 let first_symbol = params
228 .symbols
229 .first()
230 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
231 let stype_in = infer_symbology_type(first_symbol);
232 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
233 let time_range = get_date_time_range(params.start, end)?;
234 let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
235 let dbn_schema = dbn::Schema::from_str(&schema)?;
236
237 match dbn_schema {
238 dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
239 _ => anyhow::bail!("Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m"),
240 }
241
242 let range_params = GetRangeParams::builder()
243 .dataset(params.dataset)
244 .date_time_range(time_range)
245 .symbols(symbols)
246 .stype_in(stype_in)
247 .schema(dbn_schema)
248 .limit(params.limit.and_then(NonZeroU64::new))
249 .build();
250
251 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
252
253 let mut client = self.inner.lock().await;
254 let mut decoder = client
255 .timeseries()
256 .get_range(&range_params)
257 .await
258 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
259
260 let metadata = decoder.metadata().clone();
261 let mut metadata_cache = MetadataCache::new(metadata);
262 let mut result: Vec<QuoteTick> = Vec::new();
263
264 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
265 let sym_map = self
266 .symbol_venue_map
267 .read()
268 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
269 let instrument_id = decode_nautilus_instrument_id(
270 &record,
271 &mut metadata_cache,
272 &self.publisher_venue_map,
273 &sym_map,
274 )?;
275
276 let (data, _) = decode_record(
277 &record,
278 instrument_id,
279 price_precision,
280 None,
281 false, true,
283 )?;
284
285 match data {
286 Some(Data::Quote(quote)) => {
287 result.push(quote);
288 Ok(())
289 }
290 _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
291 }
292 };
293
294 match dbn_schema {
295 dbn::Schema::Mbp1 => {
296 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
297 process_record(dbn::RecordRef::from(msg))?;
298 }
299 }
300 dbn::Schema::Bbo1M => {
301 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
302 process_record(dbn::RecordRef::from(msg))?;
303 }
304 }
305 dbn::Schema::Bbo1S => {
306 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
307 process_record(dbn::RecordRef::from(msg))?;
308 }
309 }
310 _ => anyhow::bail!("Invalid schema {dbn_schema}"),
311 }
312
313 Ok(result)
314 }
315
316 pub async fn get_range_order_book_depth10(
322 &self,
323 params: RangeQueryParams,
324 depth: Option<usize>,
325 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
326 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
327 check_consistent_symbology(&symbols)?;
328
329 let first_symbol = params
330 .symbols
331 .first()
332 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
333 let stype_in = infer_symbology_type(first_symbol);
334 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
335 let time_range = get_date_time_range(params.start, end)?;
336
337 let _depth = depth.unwrap_or(10);
339 if _depth != 10 {
340 anyhow::bail!("Only depth=10 is currently supported for order book depths");
341 }
342
343 let range_params = GetRangeParams::builder()
344 .dataset(params.dataset)
345 .date_time_range(time_range)
346 .symbols(symbols)
347 .stype_in(stype_in)
348 .schema(dbn::Schema::Mbp10)
349 .limit(params.limit.and_then(NonZeroU64::new))
350 .build();
351
352 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
353
354 let mut client = self.inner.lock().await;
355 let mut decoder = client
356 .timeseries()
357 .get_range(&range_params)
358 .await
359 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
360
361 let metadata = decoder.metadata().clone();
362 let mut metadata_cache = MetadataCache::new(metadata);
363 let mut result: Vec<OrderBookDepth10> = Vec::new();
364
365 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
366 let sym_map = self
367 .symbol_venue_map
368 .read()
369 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
370 let instrument_id = decode_nautilus_instrument_id(
371 &record,
372 &mut metadata_cache,
373 &self.publisher_venue_map,
374 &sym_map,
375 )?;
376
377 if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
378 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
379 result.push(depth);
380 }
381
382 Ok(())
383 };
384
385 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
386 process_record(dbn::RecordRef::from(msg))?;
387 }
388
389 Ok(result)
390 }
391
392 pub async fn get_range_trades(
398 &self,
399 params: RangeQueryParams,
400 ) -> anyhow::Result<Vec<TradeTick>> {
401 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
402 check_consistent_symbology(&symbols)?;
403
404 let first_symbol = params
405 .symbols
406 .first()
407 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
408 let stype_in = infer_symbology_type(first_symbol);
409 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
410 let time_range = get_date_time_range(params.start, end)?;
411
412 let range_params = GetRangeParams::builder()
413 .dataset(params.dataset)
414 .date_time_range(time_range)
415 .symbols(symbols)
416 .stype_in(stype_in)
417 .schema(dbn::Schema::Trades)
418 .limit(params.limit.and_then(NonZeroU64::new))
419 .build();
420
421 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
422
423 let mut client = self.inner.lock().await;
424 let mut decoder = client
425 .timeseries()
426 .get_range(&range_params)
427 .await
428 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
429
430 let metadata = decoder.metadata().clone();
431 let mut metadata_cache = MetadataCache::new(metadata);
432 let mut result: Vec<TradeTick> = Vec::new();
433
434 while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
435 let record = dbn::RecordRef::from(msg);
436 let sym_map = self
437 .symbol_venue_map
438 .read()
439 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
440 let instrument_id = decode_nautilus_instrument_id(
441 &record,
442 &mut metadata_cache,
443 &self.publisher_venue_map,
444 &sym_map,
445 )?;
446
447 let (data, _) = decode_record(
448 &record,
449 instrument_id,
450 price_precision,
451 None,
452 false, true,
454 )?;
455
456 match data {
457 Some(Data::Trade(trade)) => {
458 result.push(trade);
459 }
460 _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
461 }
462 }
463
464 Ok(result)
465 }
466
467 pub async fn get_range_bars(
473 &self,
474 params: RangeQueryParams,
475 aggregation: BarAggregation,
476 timestamp_on_close: bool,
477 ) -> anyhow::Result<Vec<Bar>> {
478 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
479 check_consistent_symbology(&symbols)?;
480
481 let first_symbol = params
482 .symbols
483 .first()
484 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
485 let stype_in = infer_symbology_type(first_symbol);
486 let schema = match aggregation {
487 BarAggregation::Second => dbn::Schema::Ohlcv1S,
488 BarAggregation::Minute => dbn::Schema::Ohlcv1M,
489 BarAggregation::Hour => dbn::Schema::Ohlcv1H,
490 BarAggregation::Day => dbn::Schema::Ohlcv1D,
491 _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
492 };
493
494 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
495 let time_range = get_date_time_range(params.start, end)?;
496
497 let range_params = GetRangeParams::builder()
498 .dataset(params.dataset)
499 .date_time_range(time_range)
500 .symbols(symbols)
501 .stype_in(stype_in)
502 .schema(schema)
503 .limit(params.limit.and_then(NonZeroU64::new))
504 .build();
505
506 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
507
508 let mut client = self.inner.lock().await;
509 let mut decoder = client
510 .timeseries()
511 .get_range(&range_params)
512 .await
513 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
514
515 let metadata = decoder.metadata().clone();
516 let mut metadata_cache = MetadataCache::new(metadata);
517 let mut result: Vec<Bar> = Vec::new();
518
519 while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
520 let record = dbn::RecordRef::from(msg);
521 let sym_map = self
522 .symbol_venue_map
523 .read()
524 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
525 let instrument_id = decode_nautilus_instrument_id(
526 &record,
527 &mut metadata_cache,
528 &self.publisher_venue_map,
529 &sym_map,
530 )?;
531
532 let (data, _) = decode_record(
533 &record,
534 instrument_id,
535 price_precision,
536 None,
537 false, timestamp_on_close,
539 )?;
540
541 match data {
542 Some(Data::Bar(bar)) => {
543 result.push(bar);
544 }
545 _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
546 }
547 }
548
549 Ok(result)
550 }
551
552 pub async fn get_range_imbalance(
558 &self,
559 params: RangeQueryParams,
560 ) -> anyhow::Result<Vec<DatabentoImbalance>> {
561 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
562 check_consistent_symbology(&symbols)?;
563
564 let first_symbol = params
565 .symbols
566 .first()
567 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
568 let stype_in = infer_symbology_type(first_symbol);
569 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
570 let time_range = get_date_time_range(params.start, end)?;
571
572 let range_params = GetRangeParams::builder()
573 .dataset(params.dataset)
574 .date_time_range(time_range)
575 .symbols(symbols)
576 .stype_in(stype_in)
577 .schema(dbn::Schema::Imbalance)
578 .limit(params.limit.and_then(NonZeroU64::new))
579 .build();
580
581 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
582
583 let mut client = self.inner.lock().await;
584 let mut decoder = client
585 .timeseries()
586 .get_range(&range_params)
587 .await
588 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
589
590 let metadata = decoder.metadata().clone();
591 let mut metadata_cache = MetadataCache::new(metadata);
592 let mut result: Vec<DatabentoImbalance> = Vec::new();
593
594 while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
595 let record = dbn::RecordRef::from(msg);
596 let sym_map = self
597 .symbol_venue_map
598 .read()
599 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
600 let instrument_id = decode_nautilus_instrument_id(
601 &record,
602 &mut metadata_cache,
603 &self.publisher_venue_map,
604 &sym_map,
605 )?;
606
607 let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
608 result.push(imbalance);
609 }
610
611 Ok(result)
612 }
613
614 pub async fn get_range_statistics(
620 &self,
621 params: RangeQueryParams,
622 ) -> anyhow::Result<Vec<DatabentoStatistics>> {
623 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
624 check_consistent_symbology(&symbols)?;
625
626 let first_symbol = params
627 .symbols
628 .first()
629 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
630 let stype_in = infer_symbology_type(first_symbol);
631 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
632 let time_range = get_date_time_range(params.start, end)?;
633
634 let range_params = GetRangeParams::builder()
635 .dataset(params.dataset)
636 .date_time_range(time_range)
637 .symbols(symbols)
638 .stype_in(stype_in)
639 .schema(dbn::Schema::Statistics)
640 .limit(params.limit.and_then(NonZeroU64::new))
641 .build();
642
643 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
644
645 let mut client = self.inner.lock().await;
646 let mut decoder = client
647 .timeseries()
648 .get_range(&range_params)
649 .await
650 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
651
652 let metadata = decoder.metadata().clone();
653 let mut metadata_cache = MetadataCache::new(metadata);
654 let mut result: Vec<DatabentoStatistics> = Vec::new();
655
656 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
657 let record = dbn::RecordRef::from(msg);
658 let sym_map = self
659 .symbol_venue_map
660 .read()
661 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
662 let instrument_id = decode_nautilus_instrument_id(
663 &record,
664 &mut metadata_cache,
665 &self.publisher_venue_map,
666 &sym_map,
667 )?;
668
669 let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
670 result.push(statistics);
671 }
672
673 Ok(result)
674 }
675
676 pub async fn get_range_status(
682 &self,
683 params: RangeQueryParams,
684 ) -> anyhow::Result<Vec<InstrumentStatus>> {
685 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
686 check_consistent_symbology(&symbols)?;
687
688 let first_symbol = params
689 .symbols
690 .first()
691 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
692 let stype_in = infer_symbology_type(first_symbol);
693 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
694 let time_range = get_date_time_range(params.start, end)?;
695
696 let range_params = GetRangeParams::builder()
697 .dataset(params.dataset)
698 .date_time_range(time_range)
699 .symbols(symbols)
700 .stype_in(stype_in)
701 .schema(dbn::Schema::Status)
702 .limit(params.limit.and_then(NonZeroU64::new))
703 .build();
704
705 let mut client = self.inner.lock().await;
706 let mut decoder = client
707 .timeseries()
708 .get_range(&range_params)
709 .await
710 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
711
712 let metadata = decoder.metadata().clone();
713 let mut metadata_cache = MetadataCache::new(metadata);
714 let mut result: Vec<InstrumentStatus> = Vec::new();
715
716 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
717 let record = dbn::RecordRef::from(msg);
718 let sym_map = self
719 .symbol_venue_map
720 .read()
721 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
722 let instrument_id = decode_nautilus_instrument_id(
723 &record,
724 &mut metadata_cache,
725 &self.publisher_venue_map,
726 &sym_map,
727 )?;
728
729 let status = decode_status_msg(msg, instrument_id, None)?;
730 result.push(status);
731 }
732
733 Ok(result)
734 }
735
736 pub fn prepare_symbols_from_instrument_ids(
742 &self,
743 instrument_ids: &[InstrumentId],
744 ) -> anyhow::Result<Vec<String>> {
745 let mut symbol_venue_map = self
746 .symbol_venue_map
747 .write()
748 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
749
750 let symbols: Vec<String> = instrument_ids
751 .iter()
752 .map(|instrument_id| {
753 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
754 })
755 .collect();
756
757 Ok(symbols)
758 }
759}