nautilus_databento/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core Databento historical client for both Rust and Python usage.
17
18use std::{
19    fs,
20    num::NonZeroU64,
21    path::PathBuf,
22    str::FromStr,
23    sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28    dbn::{self, decode::DbnMetadata},
29    historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34    data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick},
35    enums::BarAggregation,
36    identifiers::{InstrumentId, Symbol, Venue},
37    instruments::InstrumentAny,
38    types::Currency,
39};
40use tokio::sync::Mutex;
41
42use crate::{
43    common::get_date_time_range,
44    decode::{
45        decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record,
46        decode_statistics_msg, decode_status_msg,
47    },
48    symbology::{
49        MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
50        infer_symbology_type, instrument_id_to_symbol_string,
51    },
52    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
53};
54
55/// Core Databento historical client for fetching historical market data.
56///
57/// This client provides both synchronous and asynchronous interfaces for fetching
58/// various types of historical market data from Databento.
59#[derive(Debug, Clone)]
60pub struct DatabentoHistoricalClient {
61    pub key: String,
62    clock: &'static AtomicTime,
63    inner: Arc<Mutex<databento::HistoricalClient>>,
64    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
65    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
66    use_exchange_as_venue: bool,
67}
68
69/// Parameters for range queries to Databento historical API.
70#[derive(Debug)]
71pub struct RangeQueryParams {
72    pub dataset: String,
73    pub symbols: Vec<String>,
74    pub start: UnixNanos,
75    pub end: Option<UnixNanos>,
76    pub limit: Option<u64>,
77    pub price_precision: Option<u8>,
78}
79
80/// Result containing dataset date range information.
81#[derive(Debug, Clone)]
82pub struct DatasetRange {
83    pub start: String,
84    pub end: String,
85}
86
87impl DatabentoHistoricalClient {
88    /// Creates a new [`DatabentoHistoricalClient`] instance.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if client creation or publisher loading fails.
93    pub fn new(
94        key: String,
95        publishers_filepath: PathBuf,
96        clock: &'static AtomicTime,
97        use_exchange_as_venue: bool,
98    ) -> anyhow::Result<Self> {
99        let client = databento::HistoricalClient::builder()
100            .user_agent_extension(NAUTILUS_USER_AGENT.into())
101            .key(key.clone())
102            .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
103            .build()
104            .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
105
106        let file_content = fs::read_to_string(publishers_filepath)?;
107        let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
108
109        let publisher_venue_map = publishers_vec
110            .into_iter()
111            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
112            .collect::<IndexMap<u16, Venue>>();
113
114        Ok(Self {
115            clock,
116            inner: Arc::new(Mutex::new(client)),
117            publisher_venue_map: Arc::new(publisher_venue_map),
118            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
119            key,
120            use_exchange_as_venue,
121        })
122    }
123
124    /// Gets the date range for a specific dataset.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the API request fails.
129    pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
130        let mut client = self.inner.lock().await;
131        let response = client
132            .metadata()
133            .get_dataset_range(dataset)
134            .await
135            .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
136
137        Ok(DatasetRange {
138            start: response.start.to_string(),
139            end: response.end.to_string(),
140        })
141    }
142
143    /// Fetches instrument definitions for the given parameters.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the API request or data processing fails.
148    pub async fn get_range_instruments(
149        &self,
150        params: RangeQueryParams,
151    ) -> anyhow::Result<Vec<InstrumentAny>> {
152        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
153        check_consistent_symbology(&symbols)?;
154
155        let first_symbol = params
156            .symbols
157            .first()
158            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
159        let stype_in = infer_symbology_type(first_symbol);
160        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
161        let time_range = get_date_time_range(params.start, end)?;
162
163        let range_params = GetRangeParams::builder()
164            .dataset(params.dataset)
165            .date_time_range(time_range)
166            .symbols(symbols)
167            .stype_in(stype_in)
168            .schema(dbn::Schema::Definition)
169            .limit(params.limit.and_then(NonZeroU64::new))
170            .build();
171
172        let mut client = self.inner.lock().await;
173        let mut decoder = client
174            .timeseries()
175            .get_range(&range_params)
176            .await
177            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
178
179        let metadata = decoder.metadata().clone();
180        let mut metadata_cache = MetadataCache::new(metadata);
181        let mut instruments = Vec::new();
182
183        while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
184            let record = dbn::RecordRef::from(msg);
185            let sym_map = self
186                .symbol_venue_map
187                .read()
188                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
189            let mut instrument_id = decode_nautilus_instrument_id(
190                &record,
191                &mut metadata_cache,
192                &self.publisher_venue_map,
193                &sym_map,
194            )?;
195
196            if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
197                let exchange = msg
198                    .exchange()
199                    .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
200                let venue = Venue::from_code(exchange)
201                    .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
202                instrument_id.venue = venue;
203            }
204
205            match decode_instrument_def_msg(msg, instrument_id, None) {
206                Ok(instrument) => instruments.push(instrument),
207                Err(e) => tracing::error!("Failed to decode instrument: {e:?}"),
208            }
209        }
210
211        Ok(instruments)
212    }
213
214    /// Fetches quote ticks for the given parameters.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the API request or data processing fails.
219    pub async fn get_range_quotes(
220        &self,
221        params: RangeQueryParams,
222        schema: Option<String>,
223    ) -> anyhow::Result<Vec<QuoteTick>> {
224        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
225        check_consistent_symbology(&symbols)?;
226
227        let first_symbol = params
228            .symbols
229            .first()
230            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
231        let stype_in = infer_symbology_type(first_symbol);
232        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
233        let time_range = get_date_time_range(params.start, end)?;
234        let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
235        let dbn_schema = dbn::Schema::from_str(&schema)?;
236
237        match dbn_schema {
238            dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
239            _ => anyhow::bail!("Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m"),
240        }
241
242        let range_params = GetRangeParams::builder()
243            .dataset(params.dataset)
244            .date_time_range(time_range)
245            .symbols(symbols)
246            .stype_in(stype_in)
247            .schema(dbn_schema)
248            .limit(params.limit.and_then(NonZeroU64::new))
249            .build();
250
251        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
252
253        let mut client = self.inner.lock().await;
254        let mut decoder = client
255            .timeseries()
256            .get_range(&range_params)
257            .await
258            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
259
260        let metadata = decoder.metadata().clone();
261        let mut metadata_cache = MetadataCache::new(metadata);
262        let mut result: Vec<QuoteTick> = Vec::new();
263
264        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
265            let sym_map = self
266                .symbol_venue_map
267                .read()
268                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
269            let instrument_id = decode_nautilus_instrument_id(
270                &record,
271                &mut metadata_cache,
272                &self.publisher_venue_map,
273                &sym_map,
274            )?;
275
276            let (data, _) = decode_record(
277                &record,
278                instrument_id,
279                price_precision,
280                None,
281                false, // Don't include trades
282                true,
283            )?;
284
285            match data {
286                Some(Data::Quote(quote)) => {
287                    result.push(quote);
288                    Ok(())
289                }
290                _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
291            }
292        };
293
294        match dbn_schema {
295            dbn::Schema::Mbp1 => {
296                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
297                    process_record(dbn::RecordRef::from(msg))?;
298                }
299            }
300            dbn::Schema::Bbo1M => {
301                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
302                    process_record(dbn::RecordRef::from(msg))?;
303                }
304            }
305            dbn::Schema::Bbo1S => {
306                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
307                    process_record(dbn::RecordRef::from(msg))?;
308                }
309            }
310            _ => anyhow::bail!("Invalid schema {dbn_schema}"),
311        }
312
313        Ok(result)
314    }
315
316    /// Fetches order book depth10 snapshots for the given parameters.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the API request or data processing fails.
321    pub async fn get_range_order_book_depth10(
322        &self,
323        params: RangeQueryParams,
324        depth: Option<usize>,
325    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
326        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
327        check_consistent_symbology(&symbols)?;
328
329        let first_symbol = params
330            .symbols
331            .first()
332            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
333        let stype_in = infer_symbology_type(first_symbol);
334        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
335        let time_range = get_date_time_range(params.start, end)?;
336
337        // For now, only support MBP_10 schema for depth 10
338        let _depth = depth.unwrap_or(10);
339        if _depth != 10 {
340            anyhow::bail!("Only depth=10 is currently supported for order book depths");
341        }
342
343        let range_params = GetRangeParams::builder()
344            .dataset(params.dataset)
345            .date_time_range(time_range)
346            .symbols(symbols)
347            .stype_in(stype_in)
348            .schema(dbn::Schema::Mbp10)
349            .limit(params.limit.and_then(NonZeroU64::new))
350            .build();
351
352        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
353
354        let mut client = self.inner.lock().await;
355        let mut decoder = client
356            .timeseries()
357            .get_range(&range_params)
358            .await
359            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
360
361        let metadata = decoder.metadata().clone();
362        let mut metadata_cache = MetadataCache::new(metadata);
363        let mut result: Vec<OrderBookDepth10> = Vec::new();
364
365        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
366            let sym_map = self
367                .symbol_venue_map
368                .read()
369                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
370            let instrument_id = decode_nautilus_instrument_id(
371                &record,
372                &mut metadata_cache,
373                &self.publisher_venue_map,
374                &sym_map,
375            )?;
376
377            if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
378                let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
379                result.push(depth);
380            }
381
382            Ok(())
383        };
384
385        while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
386            process_record(dbn::RecordRef::from(msg))?;
387        }
388
389        Ok(result)
390    }
391
392    /// Fetches trade ticks for the given parameters.
393    ///
394    /// # Errors
395    ///
396    /// Returns an error if the API request or data processing fails.
397    pub async fn get_range_trades(
398        &self,
399        params: RangeQueryParams,
400    ) -> anyhow::Result<Vec<TradeTick>> {
401        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
402        check_consistent_symbology(&symbols)?;
403
404        let first_symbol = params
405            .symbols
406            .first()
407            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
408        let stype_in = infer_symbology_type(first_symbol);
409        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
410        let time_range = get_date_time_range(params.start, end)?;
411
412        let range_params = GetRangeParams::builder()
413            .dataset(params.dataset)
414            .date_time_range(time_range)
415            .symbols(symbols)
416            .stype_in(stype_in)
417            .schema(dbn::Schema::Trades)
418            .limit(params.limit.and_then(NonZeroU64::new))
419            .build();
420
421        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
422
423        let mut client = self.inner.lock().await;
424        let mut decoder = client
425            .timeseries()
426            .get_range(&range_params)
427            .await
428            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
429
430        let metadata = decoder.metadata().clone();
431        let mut metadata_cache = MetadataCache::new(metadata);
432        let mut result: Vec<TradeTick> = Vec::new();
433
434        while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
435            let record = dbn::RecordRef::from(msg);
436            let sym_map = self
437                .symbol_venue_map
438                .read()
439                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
440            let instrument_id = decode_nautilus_instrument_id(
441                &record,
442                &mut metadata_cache,
443                &self.publisher_venue_map,
444                &sym_map,
445            )?;
446
447            let (data, _) = decode_record(
448                &record,
449                instrument_id,
450                price_precision,
451                None,
452                false, // Not applicable (trade will be decoded regardless)
453                true,
454            )?;
455
456            match data {
457                Some(Data::Trade(trade)) => {
458                    result.push(trade);
459                }
460                _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
461            }
462        }
463
464        Ok(result)
465    }
466
467    /// Fetches bars for the given parameters.
468    ///
469    /// # Errors
470    ///
471    /// Returns an error if the API request or data processing fails.
472    pub async fn get_range_bars(
473        &self,
474        params: RangeQueryParams,
475        aggregation: BarAggregation,
476        timestamp_on_close: bool,
477    ) -> anyhow::Result<Vec<Bar>> {
478        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
479        check_consistent_symbology(&symbols)?;
480
481        let first_symbol = params
482            .symbols
483            .first()
484            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
485        let stype_in = infer_symbology_type(first_symbol);
486        let schema = match aggregation {
487            BarAggregation::Second => dbn::Schema::Ohlcv1S,
488            BarAggregation::Minute => dbn::Schema::Ohlcv1M,
489            BarAggregation::Hour => dbn::Schema::Ohlcv1H,
490            BarAggregation::Day => dbn::Schema::Ohlcv1D,
491            _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
492        };
493
494        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
495        let time_range = get_date_time_range(params.start, end)?;
496
497        let range_params = GetRangeParams::builder()
498            .dataset(params.dataset)
499            .date_time_range(time_range)
500            .symbols(symbols)
501            .stype_in(stype_in)
502            .schema(schema)
503            .limit(params.limit.and_then(NonZeroU64::new))
504            .build();
505
506        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
507
508        let mut client = self.inner.lock().await;
509        let mut decoder = client
510            .timeseries()
511            .get_range(&range_params)
512            .await
513            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
514
515        let metadata = decoder.metadata().clone();
516        let mut metadata_cache = MetadataCache::new(metadata);
517        let mut result: Vec<Bar> = Vec::new();
518
519        while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
520            let record = dbn::RecordRef::from(msg);
521            let sym_map = self
522                .symbol_venue_map
523                .read()
524                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
525            let instrument_id = decode_nautilus_instrument_id(
526                &record,
527                &mut metadata_cache,
528                &self.publisher_venue_map,
529                &sym_map,
530            )?;
531
532            let (data, _) = decode_record(
533                &record,
534                instrument_id,
535                price_precision,
536                None,
537                false, // Not applicable
538                timestamp_on_close,
539            )?;
540
541            match data {
542                Some(Data::Bar(bar)) => {
543                    result.push(bar);
544                }
545                _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
546            }
547        }
548
549        Ok(result)
550    }
551
552    /// Fetches imbalance data for the given parameters.
553    ///
554    /// # Errors
555    ///
556    /// Returns an error if the API request or data processing fails.
557    pub async fn get_range_imbalance(
558        &self,
559        params: RangeQueryParams,
560    ) -> anyhow::Result<Vec<DatabentoImbalance>> {
561        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
562        check_consistent_symbology(&symbols)?;
563
564        let first_symbol = params
565            .symbols
566            .first()
567            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
568        let stype_in = infer_symbology_type(first_symbol);
569        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
570        let time_range = get_date_time_range(params.start, end)?;
571
572        let range_params = GetRangeParams::builder()
573            .dataset(params.dataset)
574            .date_time_range(time_range)
575            .symbols(symbols)
576            .stype_in(stype_in)
577            .schema(dbn::Schema::Imbalance)
578            .limit(params.limit.and_then(NonZeroU64::new))
579            .build();
580
581        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
582
583        let mut client = self.inner.lock().await;
584        let mut decoder = client
585            .timeseries()
586            .get_range(&range_params)
587            .await
588            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
589
590        let metadata = decoder.metadata().clone();
591        let mut metadata_cache = MetadataCache::new(metadata);
592        let mut result: Vec<DatabentoImbalance> = Vec::new();
593
594        while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
595            let record = dbn::RecordRef::from(msg);
596            let sym_map = self
597                .symbol_venue_map
598                .read()
599                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
600            let instrument_id = decode_nautilus_instrument_id(
601                &record,
602                &mut metadata_cache,
603                &self.publisher_venue_map,
604                &sym_map,
605            )?;
606
607            let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
608            result.push(imbalance);
609        }
610
611        Ok(result)
612    }
613
614    /// Fetches statistics data for the given parameters.
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if the API request or data processing fails.
619    pub async fn get_range_statistics(
620        &self,
621        params: RangeQueryParams,
622    ) -> anyhow::Result<Vec<DatabentoStatistics>> {
623        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
624        check_consistent_symbology(&symbols)?;
625
626        let first_symbol = params
627            .symbols
628            .first()
629            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
630        let stype_in = infer_symbology_type(first_symbol);
631        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
632        let time_range = get_date_time_range(params.start, end)?;
633
634        let range_params = GetRangeParams::builder()
635            .dataset(params.dataset)
636            .date_time_range(time_range)
637            .symbols(symbols)
638            .stype_in(stype_in)
639            .schema(dbn::Schema::Statistics)
640            .limit(params.limit.and_then(NonZeroU64::new))
641            .build();
642
643        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
644
645        let mut client = self.inner.lock().await;
646        let mut decoder = client
647            .timeseries()
648            .get_range(&range_params)
649            .await
650            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
651
652        let metadata = decoder.metadata().clone();
653        let mut metadata_cache = MetadataCache::new(metadata);
654        let mut result: Vec<DatabentoStatistics> = Vec::new();
655
656        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
657            let record = dbn::RecordRef::from(msg);
658            let sym_map = self
659                .symbol_venue_map
660                .read()
661                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
662            let instrument_id = decode_nautilus_instrument_id(
663                &record,
664                &mut metadata_cache,
665                &self.publisher_venue_map,
666                &sym_map,
667            )?;
668
669            let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
670            result.push(statistics);
671        }
672
673        Ok(result)
674    }
675
676    /// Fetches status data for the given parameters.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if the API request or data processing fails.
681    pub async fn get_range_status(
682        &self,
683        params: RangeQueryParams,
684    ) -> anyhow::Result<Vec<InstrumentStatus>> {
685        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
686        check_consistent_symbology(&symbols)?;
687
688        let first_symbol = params
689            .symbols
690            .first()
691            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
692        let stype_in = infer_symbology_type(first_symbol);
693        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
694        let time_range = get_date_time_range(params.start, end)?;
695
696        let range_params = GetRangeParams::builder()
697            .dataset(params.dataset)
698            .date_time_range(time_range)
699            .symbols(symbols)
700            .stype_in(stype_in)
701            .schema(dbn::Schema::Status)
702            .limit(params.limit.and_then(NonZeroU64::new))
703            .build();
704
705        let mut client = self.inner.lock().await;
706        let mut decoder = client
707            .timeseries()
708            .get_range(&range_params)
709            .await
710            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
711
712        let metadata = decoder.metadata().clone();
713        let mut metadata_cache = MetadataCache::new(metadata);
714        let mut result: Vec<InstrumentStatus> = Vec::new();
715
716        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
717            let record = dbn::RecordRef::from(msg);
718            let sym_map = self
719                .symbol_venue_map
720                .read()
721                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
722            let instrument_id = decode_nautilus_instrument_id(
723                &record,
724                &mut metadata_cache,
725                &self.publisher_venue_map,
726                &sym_map,
727            )?;
728
729            let status = decode_status_msg(msg, instrument_id, None)?;
730            result.push(status);
731        }
732
733        Ok(result)
734    }
735
736    /// Helper method to prepare symbols from instrument IDs.
737    ///
738    /// # Errors
739    ///
740    /// Returns an error if the symbol venue map lock is poisoned.
741    pub fn prepare_symbols_from_instrument_ids(
742        &self,
743        instrument_ids: &[InstrumentId],
744    ) -> anyhow::Result<Vec<String>> {
745        let mut symbol_venue_map = self
746            .symbol_venue_map
747            .write()
748            .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
749
750        let symbols: Vec<String> = instrument_ids
751            .iter()
752            .map(|instrument_id| {
753                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
754            })
755            .collect();
756
757        Ok(symbols)
758    }
759}