1use std::{
19 fs,
20 num::NonZeroU64,
21 path::PathBuf,
22 str::FromStr,
23 sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28 dbn::{self, decode::DbnMetadata},
29 historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
35 enums::BarAggregation,
36 identifiers::{InstrumentId, Symbol, Venue},
37 instruments::InstrumentAny,
38 types::Currency,
39};
40
41use crate::{
42 common::get_date_time_range,
43 decode::{
44 decode_imbalance_msg, decode_instrument_def_msg, decode_mbo_msg, decode_mbp10_msg,
45 decode_record, decode_statistics_msg, decode_status_msg,
46 },
47 symbology::{
48 MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
49 infer_symbology_type, instrument_id_to_symbol_string,
50 },
51 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
52};
53
54#[derive(Debug, Clone)]
59pub struct DatabentoHistoricalClient {
60 pub key: String,
61 clock: &'static AtomicTime,
62 inner: Arc<tokio::sync::Mutex<databento::HistoricalClient>>,
63 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
64 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
65 use_exchange_as_venue: bool,
66}
67
68#[derive(Debug)]
70pub struct RangeQueryParams {
71 pub dataset: String,
72 pub symbols: Vec<String>,
73 pub start: UnixNanos,
74 pub end: Option<UnixNanos>,
75 pub limit: Option<u64>,
76 pub price_precision: Option<u8>,
77}
78
79#[derive(Debug, Clone)]
81pub struct DatasetRange {
82 pub start: String,
83 pub end: String,
84}
85
86impl DatabentoHistoricalClient {
87 pub fn new(
93 key: String,
94 publishers_filepath: PathBuf,
95 clock: &'static AtomicTime,
96 use_exchange_as_venue: bool,
97 ) -> anyhow::Result<Self> {
98 let client = databento::HistoricalClient::builder()
99 .user_agent_extension(NAUTILUS_USER_AGENT.into())
100 .key(key.clone())
101 .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
102 .build()
103 .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
104
105 let file_content = fs::read_to_string(publishers_filepath)?;
106 let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
107
108 let publisher_venue_map = publishers_vec
109 .into_iter()
110 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
111 .collect::<IndexMap<u16, Venue>>();
112
113 Ok(Self {
114 clock,
115 inner: Arc::new(tokio::sync::Mutex::new(client)),
116 publisher_venue_map: Arc::new(publisher_venue_map),
117 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
118 key,
119 use_exchange_as_venue,
120 })
121 }
122
123 pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
129 let mut client = self.inner.lock().await;
130 let response = client
131 .metadata()
132 .get_dataset_range(dataset)
133 .await
134 .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
135
136 Ok(DatasetRange {
137 start: response.start.to_string(),
138 end: response.end.to_string(),
139 })
140 }
141
142 pub async fn get_range_instruments(
148 &self,
149 params: RangeQueryParams,
150 ) -> anyhow::Result<Vec<InstrumentAny>> {
151 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
152 check_consistent_symbology(&symbols)?;
153
154 let first_symbol = params
155 .symbols
156 .first()
157 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
158 let stype_in = infer_symbology_type(first_symbol);
159 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
160 let time_range = get_date_time_range(params.start, end)?;
161
162 let range_params = GetRangeParams::builder()
163 .dataset(params.dataset)
164 .date_time_range(time_range)
165 .symbols(symbols)
166 .stype_in(stype_in)
167 .schema(dbn::Schema::Definition)
168 .limit(params.limit.and_then(NonZeroU64::new))
169 .build();
170
171 let mut client = self.inner.lock().await;
172 let mut decoder = client
173 .timeseries()
174 .get_range(&range_params)
175 .await
176 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
177
178 let metadata = decoder.metadata().clone();
179 let mut metadata_cache = MetadataCache::new(metadata);
180 let mut instruments = Vec::new();
181
182 while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
183 let record = dbn::RecordRef::from(msg);
184 let sym_map = self
185 .symbol_venue_map
186 .read()
187 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
188 let mut instrument_id = decode_nautilus_instrument_id(
189 &record,
190 &mut metadata_cache,
191 &self.publisher_venue_map,
192 &sym_map,
193 )?;
194
195 if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
196 let exchange = msg
197 .exchange()
198 .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
199 let venue = Venue::from_code(exchange)
200 .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
201 instrument_id.venue = venue;
202 }
203
204 match decode_instrument_def_msg(msg, instrument_id, None) {
205 Ok(instrument) => instruments.push(instrument),
206 Err(e) => log::error!("Failed to decode instrument: {e:?}"),
207 }
208 }
209
210 Ok(instruments)
211 }
212
213 pub async fn get_range_quotes(
219 &self,
220 params: RangeQueryParams,
221 schema: Option<String>,
222 ) -> anyhow::Result<Vec<QuoteTick>> {
223 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
224 check_consistent_symbology(&symbols)?;
225
226 let first_symbol = params
227 .symbols
228 .first()
229 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
230 let stype_in = infer_symbology_type(first_symbol);
231 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
232 let time_range = get_date_time_range(params.start, end)?;
233 let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
234 let dbn_schema = dbn::Schema::from_str(&schema)?;
235
236 match dbn_schema {
237 dbn::Schema::Mbp1
238 | dbn::Schema::Bbo1S
239 | dbn::Schema::Bbo1M
240 | dbn::Schema::Cmbp1
241 | dbn::Schema::Cbbo1S
242 | dbn::Schema::Cbbo1M => (),
243 _ => anyhow::bail!(
244 "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m, cmbp-1, cbbo-1s, cbbo-1m"
245 ),
246 }
247
248 let range_params = GetRangeParams::builder()
249 .dataset(params.dataset)
250 .date_time_range(time_range)
251 .symbols(symbols)
252 .stype_in(stype_in)
253 .schema(dbn_schema)
254 .limit(params.limit.and_then(NonZeroU64::new))
255 .build();
256
257 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
258
259 let mut client = self.inner.lock().await;
260 let mut decoder = client
261 .timeseries()
262 .get_range(&range_params)
263 .await
264 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
265
266 let metadata = decoder.metadata().clone();
267 let mut metadata_cache = MetadataCache::new(metadata);
268 let mut result: Vec<QuoteTick> = Vec::new();
269
270 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
271 let sym_map = self
272 .symbol_venue_map
273 .read()
274 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
275 let instrument_id = decode_nautilus_instrument_id(
276 &record,
277 &mut metadata_cache,
278 &self.publisher_venue_map,
279 &sym_map,
280 )?;
281
282 let (data, _) = decode_record(
283 &record,
284 instrument_id,
285 price_precision,
286 None,
287 false, true,
289 )?;
290
291 match data {
292 Some(Data::Quote(quote)) => result.push(quote),
293 None => {} _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
295 }
296 Ok(())
297 };
298
299 match dbn_schema {
300 dbn::Schema::Mbp1 => {
301 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
302 process_record(dbn::RecordRef::from(msg))?;
303 }
304 }
305 dbn::Schema::Cmbp1 => {
306 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Cmbp1Msg>().await {
307 process_record(dbn::RecordRef::from(msg))?;
308 }
309 }
310 dbn::Schema::Bbo1M => {
311 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
312 process_record(dbn::RecordRef::from(msg))?;
313 }
314 }
315 dbn::Schema::Bbo1S => {
316 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
317 process_record(dbn::RecordRef::from(msg))?;
318 }
319 }
320 dbn::Schema::Cbbo1S | dbn::Schema::Cbbo1M => {
321 while let Ok(Some(msg)) = decoder.decode_record::<dbn::CbboMsg>().await {
322 process_record(dbn::RecordRef::from(msg))?;
323 }
324 }
325 _ => anyhow::bail!("Invalid schema {dbn_schema}"),
326 }
327
328 Ok(result)
329 }
330
331 pub async fn get_range_order_book_depth10(
337 &self,
338 params: RangeQueryParams,
339 depth: Option<usize>,
340 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
341 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
342 check_consistent_symbology(&symbols)?;
343
344 let first_symbol = params
345 .symbols
346 .first()
347 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
348 let stype_in = infer_symbology_type(first_symbol);
349 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
350 let time_range = get_date_time_range(params.start, end)?;
351
352 let _depth = depth.unwrap_or(10);
354 if _depth != 10 {
355 anyhow::bail!("Only depth=10 is currently supported for order book depths");
356 }
357
358 let range_params = GetRangeParams::builder()
359 .dataset(params.dataset)
360 .date_time_range(time_range)
361 .symbols(symbols)
362 .stype_in(stype_in)
363 .schema(dbn::Schema::Mbp10)
364 .limit(params.limit.and_then(NonZeroU64::new))
365 .build();
366
367 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
368
369 let mut client = self.inner.lock().await;
370 let mut decoder = client
371 .timeseries()
372 .get_range(&range_params)
373 .await
374 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
375
376 let metadata = decoder.metadata().clone();
377 let mut metadata_cache = MetadataCache::new(metadata);
378 let mut result: Vec<OrderBookDepth10> = Vec::new();
379
380 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
381 let sym_map = self
382 .symbol_venue_map
383 .read()
384 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
385 let instrument_id = decode_nautilus_instrument_id(
386 &record,
387 &mut metadata_cache,
388 &self.publisher_venue_map,
389 &sym_map,
390 )?;
391
392 if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
393 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
394 result.push(depth);
395 }
396
397 Ok(())
398 };
399
400 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
401 process_record(dbn::RecordRef::from(msg))?;
402 }
403
404 Ok(result)
405 }
406
407 pub async fn get_range_order_book_deltas(
413 &self,
414 params: RangeQueryParams,
415 ) -> anyhow::Result<Vec<OrderBookDelta>> {
416 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
417 check_consistent_symbology(&symbols)?;
418
419 let first_symbol = params
420 .symbols
421 .first()
422 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
423 let stype_in = infer_symbology_type(first_symbol);
424 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
425 let time_range = get_date_time_range(params.start, end)?;
426
427 let range_params = GetRangeParams::builder()
428 .dataset(params.dataset)
429 .date_time_range(time_range)
430 .symbols(symbols)
431 .stype_in(stype_in)
432 .schema(dbn::Schema::Mbo)
433 .limit(params.limit.and_then(NonZeroU64::new))
434 .build();
435
436 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
437
438 let mut client = self.inner.lock().await;
439 let mut decoder = client
440 .timeseries()
441 .get_range(&range_params)
442 .await
443 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
444
445 let metadata = decoder.metadata().clone();
446 let mut metadata_cache = MetadataCache::new(metadata);
447 let mut result: Vec<OrderBookDelta> = Vec::new();
448
449 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
450 let sym_map = self
451 .symbol_venue_map
452 .read()
453 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
454 let instrument_id = decode_nautilus_instrument_id(
455 &record,
456 &mut metadata_cache,
457 &self.publisher_venue_map,
458 &sym_map,
459 )?;
460
461 if let Some(msg) = record.get::<dbn::MboMsg>() {
462 let (delta, _trade) =
463 decode_mbo_msg(msg, instrument_id, price_precision, None, false)?;
464 if let Some(delta) = delta {
465 result.push(delta);
466 }
467 }
468
469 Ok(())
470 };
471
472 while let Ok(Some(msg)) = decoder.decode_record::<dbn::MboMsg>().await {
473 process_record(dbn::RecordRef::from(msg))?;
474 }
475
476 Ok(result)
477 }
478
479 pub async fn get_range_trades(
485 &self,
486 params: RangeQueryParams,
487 ) -> anyhow::Result<Vec<TradeTick>> {
488 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
489 check_consistent_symbology(&symbols)?;
490
491 let first_symbol = params
492 .symbols
493 .first()
494 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
495 let stype_in = infer_symbology_type(first_symbol);
496 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
497 let time_range = get_date_time_range(params.start, end)?;
498
499 let range_params = GetRangeParams::builder()
500 .dataset(params.dataset)
501 .date_time_range(time_range)
502 .symbols(symbols)
503 .stype_in(stype_in)
504 .schema(dbn::Schema::Trades)
505 .limit(params.limit.and_then(NonZeroU64::new))
506 .build();
507
508 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
509
510 let mut client = self.inner.lock().await;
511 let mut decoder = client
512 .timeseries()
513 .get_range(&range_params)
514 .await
515 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
516
517 let metadata = decoder.metadata().clone();
518 let mut metadata_cache = MetadataCache::new(metadata);
519 let mut result: Vec<TradeTick> = Vec::new();
520
521 while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
522 let record = dbn::RecordRef::from(msg);
523 let sym_map = self
524 .symbol_venue_map
525 .read()
526 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
527 let instrument_id = decode_nautilus_instrument_id(
528 &record,
529 &mut metadata_cache,
530 &self.publisher_venue_map,
531 &sym_map,
532 )?;
533
534 let (data, _) = decode_record(
535 &record,
536 instrument_id,
537 price_precision,
538 None,
539 false, true,
541 )?;
542
543 match data {
544 Some(Data::Trade(trade)) => {
545 result.push(trade);
546 }
547 _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
548 }
549 }
550
551 Ok(result)
552 }
553
554 pub async fn get_range_bars(
560 &self,
561 params: RangeQueryParams,
562 aggregation: BarAggregation,
563 timestamp_on_close: bool,
564 ) -> anyhow::Result<Vec<Bar>> {
565 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
566 check_consistent_symbology(&symbols)?;
567
568 let first_symbol = params
569 .symbols
570 .first()
571 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
572 let stype_in = infer_symbology_type(first_symbol);
573 let schema = match aggregation {
574 BarAggregation::Second => dbn::Schema::Ohlcv1S,
575 BarAggregation::Minute => dbn::Schema::Ohlcv1M,
576 BarAggregation::Hour => dbn::Schema::Ohlcv1H,
577 BarAggregation::Day => dbn::Schema::Ohlcv1D,
578 _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
579 };
580
581 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
582 let time_range = get_date_time_range(params.start, end)?;
583
584 let range_params = GetRangeParams::builder()
585 .dataset(params.dataset)
586 .date_time_range(time_range)
587 .symbols(symbols)
588 .stype_in(stype_in)
589 .schema(schema)
590 .limit(params.limit.and_then(NonZeroU64::new))
591 .build();
592
593 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
594
595 let mut client = self.inner.lock().await;
596 let mut decoder = client
597 .timeseries()
598 .get_range(&range_params)
599 .await
600 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
601
602 let metadata = decoder.metadata().clone();
603 let mut metadata_cache = MetadataCache::new(metadata);
604 let mut result: Vec<Bar> = Vec::new();
605
606 while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
607 let record = dbn::RecordRef::from(msg);
608 let sym_map = self
609 .symbol_venue_map
610 .read()
611 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
612 let instrument_id = decode_nautilus_instrument_id(
613 &record,
614 &mut metadata_cache,
615 &self.publisher_venue_map,
616 &sym_map,
617 )?;
618
619 let (data, _) = decode_record(
620 &record,
621 instrument_id,
622 price_precision,
623 None,
624 false, timestamp_on_close,
626 )?;
627
628 match data {
629 Some(Data::Bar(bar)) => {
630 result.push(bar);
631 }
632 _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
633 }
634 }
635
636 Ok(result)
637 }
638
639 pub async fn get_range_imbalance(
645 &self,
646 params: RangeQueryParams,
647 ) -> anyhow::Result<Vec<DatabentoImbalance>> {
648 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
649 check_consistent_symbology(&symbols)?;
650
651 let first_symbol = params
652 .symbols
653 .first()
654 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
655 let stype_in = infer_symbology_type(first_symbol);
656 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
657 let time_range = get_date_time_range(params.start, end)?;
658
659 let range_params = GetRangeParams::builder()
660 .dataset(params.dataset)
661 .date_time_range(time_range)
662 .symbols(symbols)
663 .stype_in(stype_in)
664 .schema(dbn::Schema::Imbalance)
665 .limit(params.limit.and_then(NonZeroU64::new))
666 .build();
667
668 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
669
670 let mut client = self.inner.lock().await;
671 let mut decoder = client
672 .timeseries()
673 .get_range(&range_params)
674 .await
675 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
676
677 let metadata = decoder.metadata().clone();
678 let mut metadata_cache = MetadataCache::new(metadata);
679 let mut result: Vec<DatabentoImbalance> = Vec::new();
680
681 while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
682 let record = dbn::RecordRef::from(msg);
683 let sym_map = self
684 .symbol_venue_map
685 .read()
686 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
687 let instrument_id = decode_nautilus_instrument_id(
688 &record,
689 &mut metadata_cache,
690 &self.publisher_venue_map,
691 &sym_map,
692 )?;
693
694 let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
695 result.push(imbalance);
696 }
697
698 Ok(result)
699 }
700
701 pub async fn get_range_statistics(
707 &self,
708 params: RangeQueryParams,
709 ) -> anyhow::Result<Vec<DatabentoStatistics>> {
710 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
711 check_consistent_symbology(&symbols)?;
712
713 let first_symbol = params
714 .symbols
715 .first()
716 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
717 let stype_in = infer_symbology_type(first_symbol);
718 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
719 let time_range = get_date_time_range(params.start, end)?;
720
721 let range_params = GetRangeParams::builder()
722 .dataset(params.dataset)
723 .date_time_range(time_range)
724 .symbols(symbols)
725 .stype_in(stype_in)
726 .schema(dbn::Schema::Statistics)
727 .limit(params.limit.and_then(NonZeroU64::new))
728 .build();
729
730 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
731
732 let mut client = self.inner.lock().await;
733 let mut decoder = client
734 .timeseries()
735 .get_range(&range_params)
736 .await
737 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
738
739 let metadata = decoder.metadata().clone();
740 let mut metadata_cache = MetadataCache::new(metadata);
741 let mut result: Vec<DatabentoStatistics> = Vec::new();
742
743 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
744 let record = dbn::RecordRef::from(msg);
745 let sym_map = self
746 .symbol_venue_map
747 .read()
748 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
749 let instrument_id = decode_nautilus_instrument_id(
750 &record,
751 &mut metadata_cache,
752 &self.publisher_venue_map,
753 &sym_map,
754 )?;
755
756 let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
757 result.push(statistics);
758 }
759
760 Ok(result)
761 }
762
763 pub async fn get_range_status(
769 &self,
770 params: RangeQueryParams,
771 ) -> anyhow::Result<Vec<InstrumentStatus>> {
772 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
773 check_consistent_symbology(&symbols)?;
774
775 let first_symbol = params
776 .symbols
777 .first()
778 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
779 let stype_in = infer_symbology_type(first_symbol);
780 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
781 let time_range = get_date_time_range(params.start, end)?;
782
783 let range_params = GetRangeParams::builder()
784 .dataset(params.dataset)
785 .date_time_range(time_range)
786 .symbols(symbols)
787 .stype_in(stype_in)
788 .schema(dbn::Schema::Status)
789 .limit(params.limit.and_then(NonZeroU64::new))
790 .build();
791
792 let mut client = self.inner.lock().await;
793 let mut decoder = client
794 .timeseries()
795 .get_range(&range_params)
796 .await
797 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
798
799 let metadata = decoder.metadata().clone();
800 let mut metadata_cache = MetadataCache::new(metadata);
801 let mut result: Vec<InstrumentStatus> = Vec::new();
802
803 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
804 let record = dbn::RecordRef::from(msg);
805 let sym_map = self
806 .symbol_venue_map
807 .read()
808 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
809 let instrument_id = decode_nautilus_instrument_id(
810 &record,
811 &mut metadata_cache,
812 &self.publisher_venue_map,
813 &sym_map,
814 )?;
815
816 let status = decode_status_msg(msg, instrument_id, None)?;
817 result.push(status);
818 }
819
820 Ok(result)
821 }
822
823 pub fn prepare_symbols_from_instrument_ids(
829 &self,
830 instrument_ids: &[InstrumentId],
831 ) -> anyhow::Result<Vec<String>> {
832 let mut symbol_venue_map = self
833 .symbol_venue_map
834 .write()
835 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
836
837 let symbols: Vec<String> = instrument_ids
838 .iter()
839 .map(|instrument_id| {
840 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
841 })
842 .collect();
843
844 Ok(symbols)
845 }
846}