1use std::{
19 fs,
20 num::NonZeroU64,
21 path::PathBuf,
22 str::FromStr,
23 sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28 dbn::{self, decode::DbnMetadata},
29 historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34 data::{Bar, Data, InstrumentStatus, QuoteTick, TradeTick},
35 enums::BarAggregation,
36 identifiers::{InstrumentId, Symbol, Venue},
37 instruments::InstrumentAny,
38 types::Currency,
39};
40use tokio::sync::Mutex;
41
42use crate::{
43 common::get_date_time_range,
44 decode::{
45 decode_imbalance_msg, decode_instrument_def_msg, decode_record, decode_statistics_msg,
46 decode_status_msg,
47 },
48 symbology::{
49 MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
50 infer_symbology_type, instrument_id_to_symbol_string,
51 },
52 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
53};
54
55#[derive(Debug, Clone)]
60pub struct DatabentoHistoricalClient {
61 pub key: String,
62 clock: &'static AtomicTime,
63 inner: Arc<Mutex<databento::HistoricalClient>>,
64 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
65 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
66 use_exchange_as_venue: bool,
67}
68
69#[derive(Debug)]
71pub struct RangeQueryParams {
72 pub dataset: String,
73 pub symbols: Vec<String>,
74 pub start: UnixNanos,
75 pub end: Option<UnixNanos>,
76 pub limit: Option<u64>,
77 pub price_precision: Option<u8>,
78}
79
80#[derive(Debug, Clone)]
82pub struct DatasetRange {
83 pub start: String,
84 pub end: String,
85}
86
87impl DatabentoHistoricalClient {
88 pub fn new(
94 key: String,
95 publishers_filepath: PathBuf,
96 clock: &'static AtomicTime,
97 use_exchange_as_venue: bool,
98 ) -> anyhow::Result<Self> {
99 let client = databento::HistoricalClient::builder()
100 .user_agent_extension(NAUTILUS_USER_AGENT.into())
101 .key(key.clone())
102 .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
103 .build()
104 .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
105
106 let file_content = fs::read_to_string(publishers_filepath)?;
107 let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
108
109 let publisher_venue_map = publishers_vec
110 .into_iter()
111 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
112 .collect::<IndexMap<u16, Venue>>();
113
114 Ok(Self {
115 clock,
116 inner: Arc::new(Mutex::new(client)),
117 publisher_venue_map: Arc::new(publisher_venue_map),
118 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
119 key,
120 use_exchange_as_venue,
121 })
122 }
123
124 pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
130 let mut client = self.inner.lock().await;
131 let response = client
132 .metadata()
133 .get_dataset_range(dataset)
134 .await
135 .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
136
137 Ok(DatasetRange {
138 start: response.start.to_string(),
139 end: response.end.to_string(),
140 })
141 }
142
143 pub async fn get_range_instruments(
149 &self,
150 params: RangeQueryParams,
151 ) -> anyhow::Result<Vec<InstrumentAny>> {
152 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
153 check_consistent_symbology(&symbols)?;
154
155 let first_symbol = params
156 .symbols
157 .first()
158 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
159 let stype_in = infer_symbology_type(first_symbol);
160 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
161 let time_range = get_date_time_range(params.start, end)?;
162
163 let range_params = GetRangeParams::builder()
164 .dataset(params.dataset)
165 .date_time_range(time_range)
166 .symbols(symbols)
167 .stype_in(stype_in)
168 .schema(dbn::Schema::Definition)
169 .limit(params.limit.and_then(NonZeroU64::new))
170 .build();
171
172 let mut client = self.inner.lock().await;
173 let mut decoder = client
174 .timeseries()
175 .get_range(&range_params)
176 .await
177 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
178
179 let metadata = decoder.metadata().clone();
180 let mut metadata_cache = MetadataCache::new(metadata);
181 let mut instruments = Vec::new();
182
183 while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
184 let record = dbn::RecordRef::from(msg);
185 let sym_map = self
186 .symbol_venue_map
187 .read()
188 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
189 let mut instrument_id = decode_nautilus_instrument_id(
190 &record,
191 &mut metadata_cache,
192 &self.publisher_venue_map,
193 &sym_map,
194 )?;
195
196 if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
197 let exchange = msg
198 .exchange()
199 .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
200 let venue = Venue::from_code(exchange)
201 .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
202 instrument_id.venue = venue;
203 }
204
205 match decode_instrument_def_msg(msg, instrument_id, None) {
206 Ok(instrument) => instruments.push(instrument),
207 Err(e) => tracing::error!("Failed to decode instrument: {e:?}"),
208 }
209 }
210
211 Ok(instruments)
212 }
213
214 pub async fn get_range_quotes(
220 &self,
221 params: RangeQueryParams,
222 schema: Option<String>,
223 ) -> anyhow::Result<Vec<QuoteTick>> {
224 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
225 check_consistent_symbology(&symbols)?;
226
227 let first_symbol = params
228 .symbols
229 .first()
230 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
231 let stype_in = infer_symbology_type(first_symbol);
232 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
233 let time_range = get_date_time_range(params.start, end)?;
234 let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
235 let dbn_schema = dbn::Schema::from_str(&schema)?;
236
237 match dbn_schema {
238 dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
239 _ => anyhow::bail!("Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m"),
240 }
241
242 let range_params = GetRangeParams::builder()
243 .dataset(params.dataset)
244 .date_time_range(time_range)
245 .symbols(symbols)
246 .stype_in(stype_in)
247 .schema(dbn_schema)
248 .limit(params.limit.and_then(NonZeroU64::new))
249 .build();
250
251 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
252
253 let mut client = self.inner.lock().await;
254 let mut decoder = client
255 .timeseries()
256 .get_range(&range_params)
257 .await
258 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
259
260 let metadata = decoder.metadata().clone();
261 let mut metadata_cache = MetadataCache::new(metadata);
262 let mut result: Vec<QuoteTick> = Vec::new();
263
264 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
265 let sym_map = self
266 .symbol_venue_map
267 .read()
268 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
269 let instrument_id = decode_nautilus_instrument_id(
270 &record,
271 &mut metadata_cache,
272 &self.publisher_venue_map,
273 &sym_map,
274 )?;
275
276 let (data, _) = decode_record(
277 &record,
278 instrument_id,
279 price_precision,
280 None,
281 false, true,
283 )?;
284
285 match data {
286 Some(Data::Quote(quote)) => {
287 result.push(quote);
288 Ok(())
289 }
290 _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
291 }
292 };
293
294 match dbn_schema {
295 dbn::Schema::Mbp1 => {
296 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
297 process_record(dbn::RecordRef::from(msg))?;
298 }
299 }
300 dbn::Schema::Bbo1M => {
301 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
302 process_record(dbn::RecordRef::from(msg))?;
303 }
304 }
305 dbn::Schema::Bbo1S => {
306 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
307 process_record(dbn::RecordRef::from(msg))?;
308 }
309 }
310 _ => anyhow::bail!("Invalid schema {dbn_schema}"),
311 }
312
313 Ok(result)
314 }
315
316 pub async fn get_range_trades(
322 &self,
323 params: RangeQueryParams,
324 ) -> anyhow::Result<Vec<TradeTick>> {
325 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
326 check_consistent_symbology(&symbols)?;
327
328 let first_symbol = params
329 .symbols
330 .first()
331 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
332 let stype_in = infer_symbology_type(first_symbol);
333 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
334 let time_range = get_date_time_range(params.start, end)?;
335
336 let range_params = GetRangeParams::builder()
337 .dataset(params.dataset)
338 .date_time_range(time_range)
339 .symbols(symbols)
340 .stype_in(stype_in)
341 .schema(dbn::Schema::Trades)
342 .limit(params.limit.and_then(NonZeroU64::new))
343 .build();
344
345 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
346
347 let mut client = self.inner.lock().await;
348 let mut decoder = client
349 .timeseries()
350 .get_range(&range_params)
351 .await
352 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
353
354 let metadata = decoder.metadata().clone();
355 let mut metadata_cache = MetadataCache::new(metadata);
356 let mut result: Vec<TradeTick> = Vec::new();
357
358 while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
359 let record = dbn::RecordRef::from(msg);
360 let sym_map = self
361 .symbol_venue_map
362 .read()
363 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
364 let instrument_id = decode_nautilus_instrument_id(
365 &record,
366 &mut metadata_cache,
367 &self.publisher_venue_map,
368 &sym_map,
369 )?;
370
371 let (data, _) = decode_record(
372 &record,
373 instrument_id,
374 price_precision,
375 None,
376 false, true,
378 )?;
379
380 match data {
381 Some(Data::Trade(trade)) => {
382 result.push(trade);
383 }
384 _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
385 }
386 }
387
388 Ok(result)
389 }
390
391 pub async fn get_range_bars(
397 &self,
398 params: RangeQueryParams,
399 aggregation: BarAggregation,
400 timestamp_on_close: bool,
401 ) -> anyhow::Result<Vec<Bar>> {
402 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
403 check_consistent_symbology(&symbols)?;
404
405 let first_symbol = params
406 .symbols
407 .first()
408 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
409 let stype_in = infer_symbology_type(first_symbol);
410 let schema = match aggregation {
411 BarAggregation::Second => dbn::Schema::Ohlcv1S,
412 BarAggregation::Minute => dbn::Schema::Ohlcv1M,
413 BarAggregation::Hour => dbn::Schema::Ohlcv1H,
414 BarAggregation::Day => dbn::Schema::Ohlcv1D,
415 _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
416 };
417
418 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
419 let time_range = get_date_time_range(params.start, end)?;
420
421 let range_params = GetRangeParams::builder()
422 .dataset(params.dataset)
423 .date_time_range(time_range)
424 .symbols(symbols)
425 .stype_in(stype_in)
426 .schema(schema)
427 .limit(params.limit.and_then(NonZeroU64::new))
428 .build();
429
430 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
431
432 let mut client = self.inner.lock().await;
433 let mut decoder = client
434 .timeseries()
435 .get_range(&range_params)
436 .await
437 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
438
439 let metadata = decoder.metadata().clone();
440 let mut metadata_cache = MetadataCache::new(metadata);
441 let mut result: Vec<Bar> = Vec::new();
442
443 while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
444 let record = dbn::RecordRef::from(msg);
445 let sym_map = self
446 .symbol_venue_map
447 .read()
448 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
449 let instrument_id = decode_nautilus_instrument_id(
450 &record,
451 &mut metadata_cache,
452 &self.publisher_venue_map,
453 &sym_map,
454 )?;
455
456 let (data, _) = decode_record(
457 &record,
458 instrument_id,
459 price_precision,
460 None,
461 false, timestamp_on_close,
463 )?;
464
465 match data {
466 Some(Data::Bar(bar)) => {
467 result.push(bar);
468 }
469 _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
470 }
471 }
472
473 Ok(result)
474 }
475
476 pub async fn get_range_imbalance(
482 &self,
483 params: RangeQueryParams,
484 ) -> anyhow::Result<Vec<DatabentoImbalance>> {
485 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
486 check_consistent_symbology(&symbols)?;
487
488 let first_symbol = params
489 .symbols
490 .first()
491 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
492 let stype_in = infer_symbology_type(first_symbol);
493 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
494 let time_range = get_date_time_range(params.start, end)?;
495
496 let range_params = GetRangeParams::builder()
497 .dataset(params.dataset)
498 .date_time_range(time_range)
499 .symbols(symbols)
500 .stype_in(stype_in)
501 .schema(dbn::Schema::Imbalance)
502 .limit(params.limit.and_then(NonZeroU64::new))
503 .build();
504
505 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
506
507 let mut client = self.inner.lock().await;
508 let mut decoder = client
509 .timeseries()
510 .get_range(&range_params)
511 .await
512 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
513
514 let metadata = decoder.metadata().clone();
515 let mut metadata_cache = MetadataCache::new(metadata);
516 let mut result: Vec<DatabentoImbalance> = Vec::new();
517
518 while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
519 let record = dbn::RecordRef::from(msg);
520 let sym_map = self
521 .symbol_venue_map
522 .read()
523 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
524 let instrument_id = decode_nautilus_instrument_id(
525 &record,
526 &mut metadata_cache,
527 &self.publisher_venue_map,
528 &sym_map,
529 )?;
530
531 let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
532 result.push(imbalance);
533 }
534
535 Ok(result)
536 }
537
538 pub async fn get_range_statistics(
544 &self,
545 params: RangeQueryParams,
546 ) -> anyhow::Result<Vec<DatabentoStatistics>> {
547 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
548 check_consistent_symbology(&symbols)?;
549
550 let first_symbol = params
551 .symbols
552 .first()
553 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
554 let stype_in = infer_symbology_type(first_symbol);
555 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
556 let time_range = get_date_time_range(params.start, end)?;
557
558 let range_params = GetRangeParams::builder()
559 .dataset(params.dataset)
560 .date_time_range(time_range)
561 .symbols(symbols)
562 .stype_in(stype_in)
563 .schema(dbn::Schema::Statistics)
564 .limit(params.limit.and_then(NonZeroU64::new))
565 .build();
566
567 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
568
569 let mut client = self.inner.lock().await;
570 let mut decoder = client
571 .timeseries()
572 .get_range(&range_params)
573 .await
574 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
575
576 let metadata = decoder.metadata().clone();
577 let mut metadata_cache = MetadataCache::new(metadata);
578 let mut result: Vec<DatabentoStatistics> = Vec::new();
579
580 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
581 let record = dbn::RecordRef::from(msg);
582 let sym_map = self
583 .symbol_venue_map
584 .read()
585 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
586 let instrument_id = decode_nautilus_instrument_id(
587 &record,
588 &mut metadata_cache,
589 &self.publisher_venue_map,
590 &sym_map,
591 )?;
592
593 let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
594 result.push(statistics);
595 }
596
597 Ok(result)
598 }
599
600 pub async fn get_range_status(
606 &self,
607 params: RangeQueryParams,
608 ) -> anyhow::Result<Vec<InstrumentStatus>> {
609 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
610 check_consistent_symbology(&symbols)?;
611
612 let first_symbol = params
613 .symbols
614 .first()
615 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
616 let stype_in = infer_symbology_type(first_symbol);
617 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
618 let time_range = get_date_time_range(params.start, end)?;
619
620 let range_params = GetRangeParams::builder()
621 .dataset(params.dataset)
622 .date_time_range(time_range)
623 .symbols(symbols)
624 .stype_in(stype_in)
625 .schema(dbn::Schema::Status)
626 .limit(params.limit.and_then(NonZeroU64::new))
627 .build();
628
629 let mut client = self.inner.lock().await;
630 let mut decoder = client
631 .timeseries()
632 .get_range(&range_params)
633 .await
634 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
635
636 let metadata = decoder.metadata().clone();
637 let mut metadata_cache = MetadataCache::new(metadata);
638 let mut result: Vec<InstrumentStatus> = Vec::new();
639
640 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
641 let record = dbn::RecordRef::from(msg);
642 let sym_map = self
643 .symbol_venue_map
644 .read()
645 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
646 let instrument_id = decode_nautilus_instrument_id(
647 &record,
648 &mut metadata_cache,
649 &self.publisher_venue_map,
650 &sym_map,
651 )?;
652
653 let status = decode_status_msg(msg, instrument_id, None)?;
654 result.push(status);
655 }
656
657 Ok(result)
658 }
659
660 pub fn prepare_symbols_from_instrument_ids(
666 &self,
667 instrument_ids: &[InstrumentId],
668 ) -> anyhow::Result<Vec<String>> {
669 let mut symbol_venue_map = self
670 .symbol_venue_map
671 .write()
672 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
673
674 let symbols: Vec<String> = instrument_ids
675 .iter()
676 .map(|instrument_id| {
677 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
678 })
679 .collect();
680
681 Ok(symbols)
682 }
683}