nautilus_databento/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core Databento historical client for both Rust and Python usage.
17
18use std::{
19    fs,
20    num::NonZeroU64,
21    path::PathBuf,
22    str::FromStr,
23    sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28    dbn::{self, decode::DbnMetadata},
29    historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34    data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick},
35    enums::BarAggregation,
36    identifiers::{InstrumentId, Symbol, Venue},
37    instruments::InstrumentAny,
38    types::Currency,
39};
40
41use crate::{
42    common::get_date_time_range,
43    decode::{
44        decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record,
45        decode_statistics_msg, decode_status_msg,
46    },
47    symbology::{
48        MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
49        infer_symbology_type, instrument_id_to_symbol_string,
50    },
51    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
52};
53
54/// Core Databento historical client for fetching historical market data.
55///
56/// This client provides both synchronous and asynchronous interfaces for fetching
57/// various types of historical market data from Databento.
58#[derive(Debug, Clone)]
59pub struct DatabentoHistoricalClient {
60    pub key: String,
61    clock: &'static AtomicTime,
62    inner: Arc<tokio::sync::Mutex<databento::HistoricalClient>>,
63    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
64    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
65    use_exchange_as_venue: bool,
66}
67
68/// Parameters for range queries to Databento historical API.
69#[derive(Debug)]
70pub struct RangeQueryParams {
71    pub dataset: String,
72    pub symbols: Vec<String>,
73    pub start: UnixNanos,
74    pub end: Option<UnixNanos>,
75    pub limit: Option<u64>,
76    pub price_precision: Option<u8>,
77}
78
79/// Result containing dataset date range information.
80#[derive(Debug, Clone)]
81pub struct DatasetRange {
82    pub start: String,
83    pub end: String,
84}
85
86impl DatabentoHistoricalClient {
87    /// Creates a new [`DatabentoHistoricalClient`] instance.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if client creation or publisher loading fails.
92    pub fn new(
93        key: String,
94        publishers_filepath: PathBuf,
95        clock: &'static AtomicTime,
96        use_exchange_as_venue: bool,
97    ) -> anyhow::Result<Self> {
98        let client = databento::HistoricalClient::builder()
99            .user_agent_extension(NAUTILUS_USER_AGENT.into())
100            .key(key.clone())
101            .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
102            .build()
103            .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
104
105        let file_content = fs::read_to_string(publishers_filepath)?;
106        let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
107
108        let publisher_venue_map = publishers_vec
109            .into_iter()
110            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
111            .collect::<IndexMap<u16, Venue>>();
112
113        Ok(Self {
114            clock,
115            inner: Arc::new(tokio::sync::Mutex::new(client)),
116            publisher_venue_map: Arc::new(publisher_venue_map),
117            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
118            key,
119            use_exchange_as_venue,
120        })
121    }
122
123    /// Gets the date range for a specific dataset.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the API request fails.
128    pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
129        let mut client = self.inner.lock().await;
130        let response = client
131            .metadata()
132            .get_dataset_range(dataset)
133            .await
134            .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
135
136        Ok(DatasetRange {
137            start: response.start.to_string(),
138            end: response.end.to_string(),
139        })
140    }
141
142    /// Fetches instrument definitions for the given parameters.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the API request or data processing fails.
147    pub async fn get_range_instruments(
148        &self,
149        params: RangeQueryParams,
150    ) -> anyhow::Result<Vec<InstrumentAny>> {
151        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
152        check_consistent_symbology(&symbols)?;
153
154        let first_symbol = params
155            .symbols
156            .first()
157            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
158        let stype_in = infer_symbology_type(first_symbol);
159        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
160        let time_range = get_date_time_range(params.start, end)?;
161
162        let range_params = GetRangeParams::builder()
163            .dataset(params.dataset)
164            .date_time_range(time_range)
165            .symbols(symbols)
166            .stype_in(stype_in)
167            .schema(dbn::Schema::Definition)
168            .limit(params.limit.and_then(NonZeroU64::new))
169            .build();
170
171        let mut client = self.inner.lock().await;
172        let mut decoder = client
173            .timeseries()
174            .get_range(&range_params)
175            .await
176            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
177
178        let metadata = decoder.metadata().clone();
179        let mut metadata_cache = MetadataCache::new(metadata);
180        let mut instruments = Vec::new();
181
182        while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
183            let record = dbn::RecordRef::from(msg);
184            let sym_map = self
185                .symbol_venue_map
186                .read()
187                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
188            let mut instrument_id = decode_nautilus_instrument_id(
189                &record,
190                &mut metadata_cache,
191                &self.publisher_venue_map,
192                &sym_map,
193            )?;
194
195            if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
196                let exchange = msg
197                    .exchange()
198                    .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
199                let venue = Venue::from_code(exchange)
200                    .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
201                instrument_id.venue = venue;
202            }
203
204            match decode_instrument_def_msg(msg, instrument_id, None) {
205                Ok(instrument) => instruments.push(instrument),
206                Err(e) => tracing::error!("Failed to decode instrument: {e:?}"),
207            }
208        }
209
210        Ok(instruments)
211    }
212
213    /// Fetches quote ticks for the given parameters.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if the API request or data processing fails.
218    pub async fn get_range_quotes(
219        &self,
220        params: RangeQueryParams,
221        schema: Option<String>,
222    ) -> anyhow::Result<Vec<QuoteTick>> {
223        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
224        check_consistent_symbology(&symbols)?;
225
226        let first_symbol = params
227            .symbols
228            .first()
229            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
230        let stype_in = infer_symbology_type(first_symbol);
231        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
232        let time_range = get_date_time_range(params.start, end)?;
233        let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
234        let dbn_schema = dbn::Schema::from_str(&schema)?;
235
236        match dbn_schema {
237            dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
238            _ => anyhow::bail!("Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m"),
239        }
240
241        let range_params = GetRangeParams::builder()
242            .dataset(params.dataset)
243            .date_time_range(time_range)
244            .symbols(symbols)
245            .stype_in(stype_in)
246            .schema(dbn_schema)
247            .limit(params.limit.and_then(NonZeroU64::new))
248            .build();
249
250        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
251
252        let mut client = self.inner.lock().await;
253        let mut decoder = client
254            .timeseries()
255            .get_range(&range_params)
256            .await
257            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
258
259        let metadata = decoder.metadata().clone();
260        let mut metadata_cache = MetadataCache::new(metadata);
261        let mut result: Vec<QuoteTick> = Vec::new();
262
263        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
264            let sym_map = self
265                .symbol_venue_map
266                .read()
267                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
268            let instrument_id = decode_nautilus_instrument_id(
269                &record,
270                &mut metadata_cache,
271                &self.publisher_venue_map,
272                &sym_map,
273            )?;
274
275            let (data, _) = decode_record(
276                &record,
277                instrument_id,
278                price_precision,
279                None,
280                false, // Don't include trades
281                true,
282            )?;
283
284            match data {
285                Some(Data::Quote(quote)) => result.push(quote),
286                None => {} // Skip records with undefined bid/ask prices
287                _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
288            }
289            Ok(())
290        };
291
292        match dbn_schema {
293            dbn::Schema::Mbp1 => {
294                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
295                    process_record(dbn::RecordRef::from(msg))?;
296                }
297            }
298            dbn::Schema::Bbo1M => {
299                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
300                    process_record(dbn::RecordRef::from(msg))?;
301                }
302            }
303            dbn::Schema::Bbo1S => {
304                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
305                    process_record(dbn::RecordRef::from(msg))?;
306                }
307            }
308            _ => anyhow::bail!("Invalid schema {dbn_schema}"),
309        }
310
311        Ok(result)
312    }
313
314    /// Fetches order book depth10 snapshots for the given parameters.
315    ///
316    /// # Errors
317    ///
318    /// Returns an error if the API request or data processing fails.
319    pub async fn get_range_order_book_depth10(
320        &self,
321        params: RangeQueryParams,
322        depth: Option<usize>,
323    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
324        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
325        check_consistent_symbology(&symbols)?;
326
327        let first_symbol = params
328            .symbols
329            .first()
330            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
331        let stype_in = infer_symbology_type(first_symbol);
332        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
333        let time_range = get_date_time_range(params.start, end)?;
334
335        // For now, only support MBP_10 schema for depth 10
336        let _depth = depth.unwrap_or(10);
337        if _depth != 10 {
338            anyhow::bail!("Only depth=10 is currently supported for order book depths");
339        }
340
341        let range_params = GetRangeParams::builder()
342            .dataset(params.dataset)
343            .date_time_range(time_range)
344            .symbols(symbols)
345            .stype_in(stype_in)
346            .schema(dbn::Schema::Mbp10)
347            .limit(params.limit.and_then(NonZeroU64::new))
348            .build();
349
350        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
351
352        let mut client = self.inner.lock().await;
353        let mut decoder = client
354            .timeseries()
355            .get_range(&range_params)
356            .await
357            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
358
359        let metadata = decoder.metadata().clone();
360        let mut metadata_cache = MetadataCache::new(metadata);
361        let mut result: Vec<OrderBookDepth10> = Vec::new();
362
363        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
364            let sym_map = self
365                .symbol_venue_map
366                .read()
367                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
368            let instrument_id = decode_nautilus_instrument_id(
369                &record,
370                &mut metadata_cache,
371                &self.publisher_venue_map,
372                &sym_map,
373            )?;
374
375            if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
376                let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
377                result.push(depth);
378            }
379
380            Ok(())
381        };
382
383        while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
384            process_record(dbn::RecordRef::from(msg))?;
385        }
386
387        Ok(result)
388    }
389
390    /// Fetches trade ticks for the given parameters.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the API request or data processing fails.
395    pub async fn get_range_trades(
396        &self,
397        params: RangeQueryParams,
398    ) -> anyhow::Result<Vec<TradeTick>> {
399        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
400        check_consistent_symbology(&symbols)?;
401
402        let first_symbol = params
403            .symbols
404            .first()
405            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
406        let stype_in = infer_symbology_type(first_symbol);
407        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
408        let time_range = get_date_time_range(params.start, end)?;
409
410        let range_params = GetRangeParams::builder()
411            .dataset(params.dataset)
412            .date_time_range(time_range)
413            .symbols(symbols)
414            .stype_in(stype_in)
415            .schema(dbn::Schema::Trades)
416            .limit(params.limit.and_then(NonZeroU64::new))
417            .build();
418
419        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
420
421        let mut client = self.inner.lock().await;
422        let mut decoder = client
423            .timeseries()
424            .get_range(&range_params)
425            .await
426            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
427
428        let metadata = decoder.metadata().clone();
429        let mut metadata_cache = MetadataCache::new(metadata);
430        let mut result: Vec<TradeTick> = Vec::new();
431
432        while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
433            let record = dbn::RecordRef::from(msg);
434            let sym_map = self
435                .symbol_venue_map
436                .read()
437                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
438            let instrument_id = decode_nautilus_instrument_id(
439                &record,
440                &mut metadata_cache,
441                &self.publisher_venue_map,
442                &sym_map,
443            )?;
444
445            let (data, _) = decode_record(
446                &record,
447                instrument_id,
448                price_precision,
449                None,
450                false, // Not applicable (trade will be decoded regardless)
451                true,
452            )?;
453
454            match data {
455                Some(Data::Trade(trade)) => {
456                    result.push(trade);
457                }
458                _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
459            }
460        }
461
462        Ok(result)
463    }
464
465    /// Fetches bars for the given parameters.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the API request or data processing fails.
470    pub async fn get_range_bars(
471        &self,
472        params: RangeQueryParams,
473        aggregation: BarAggregation,
474        timestamp_on_close: bool,
475    ) -> anyhow::Result<Vec<Bar>> {
476        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
477        check_consistent_symbology(&symbols)?;
478
479        let first_symbol = params
480            .symbols
481            .first()
482            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
483        let stype_in = infer_symbology_type(first_symbol);
484        let schema = match aggregation {
485            BarAggregation::Second => dbn::Schema::Ohlcv1S,
486            BarAggregation::Minute => dbn::Schema::Ohlcv1M,
487            BarAggregation::Hour => dbn::Schema::Ohlcv1H,
488            BarAggregation::Day => dbn::Schema::Ohlcv1D,
489            _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
490        };
491
492        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
493        let time_range = get_date_time_range(params.start, end)?;
494
495        let range_params = GetRangeParams::builder()
496            .dataset(params.dataset)
497            .date_time_range(time_range)
498            .symbols(symbols)
499            .stype_in(stype_in)
500            .schema(schema)
501            .limit(params.limit.and_then(NonZeroU64::new))
502            .build();
503
504        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
505
506        let mut client = self.inner.lock().await;
507        let mut decoder = client
508            .timeseries()
509            .get_range(&range_params)
510            .await
511            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
512
513        let metadata = decoder.metadata().clone();
514        let mut metadata_cache = MetadataCache::new(metadata);
515        let mut result: Vec<Bar> = Vec::new();
516
517        while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
518            let record = dbn::RecordRef::from(msg);
519            let sym_map = self
520                .symbol_venue_map
521                .read()
522                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
523            let instrument_id = decode_nautilus_instrument_id(
524                &record,
525                &mut metadata_cache,
526                &self.publisher_venue_map,
527                &sym_map,
528            )?;
529
530            let (data, _) = decode_record(
531                &record,
532                instrument_id,
533                price_precision,
534                None,
535                false, // Not applicable
536                timestamp_on_close,
537            )?;
538
539            match data {
540                Some(Data::Bar(bar)) => {
541                    result.push(bar);
542                }
543                _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
544            }
545        }
546
547        Ok(result)
548    }
549
550    /// Fetches imbalance data for the given parameters.
551    ///
552    /// # Errors
553    ///
554    /// Returns an error if the API request or data processing fails.
555    pub async fn get_range_imbalance(
556        &self,
557        params: RangeQueryParams,
558    ) -> anyhow::Result<Vec<DatabentoImbalance>> {
559        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
560        check_consistent_symbology(&symbols)?;
561
562        let first_symbol = params
563            .symbols
564            .first()
565            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
566        let stype_in = infer_symbology_type(first_symbol);
567        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
568        let time_range = get_date_time_range(params.start, end)?;
569
570        let range_params = GetRangeParams::builder()
571            .dataset(params.dataset)
572            .date_time_range(time_range)
573            .symbols(symbols)
574            .stype_in(stype_in)
575            .schema(dbn::Schema::Imbalance)
576            .limit(params.limit.and_then(NonZeroU64::new))
577            .build();
578
579        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
580
581        let mut client = self.inner.lock().await;
582        let mut decoder = client
583            .timeseries()
584            .get_range(&range_params)
585            .await
586            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
587
588        let metadata = decoder.metadata().clone();
589        let mut metadata_cache = MetadataCache::new(metadata);
590        let mut result: Vec<DatabentoImbalance> = Vec::new();
591
592        while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
593            let record = dbn::RecordRef::from(msg);
594            let sym_map = self
595                .symbol_venue_map
596                .read()
597                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
598            let instrument_id = decode_nautilus_instrument_id(
599                &record,
600                &mut metadata_cache,
601                &self.publisher_venue_map,
602                &sym_map,
603            )?;
604
605            let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
606            result.push(imbalance);
607        }
608
609        Ok(result)
610    }
611
612    /// Fetches statistics data for the given parameters.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if the API request or data processing fails.
617    pub async fn get_range_statistics(
618        &self,
619        params: RangeQueryParams,
620    ) -> anyhow::Result<Vec<DatabentoStatistics>> {
621        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
622        check_consistent_symbology(&symbols)?;
623
624        let first_symbol = params
625            .symbols
626            .first()
627            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
628        let stype_in = infer_symbology_type(first_symbol);
629        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
630        let time_range = get_date_time_range(params.start, end)?;
631
632        let range_params = GetRangeParams::builder()
633            .dataset(params.dataset)
634            .date_time_range(time_range)
635            .symbols(symbols)
636            .stype_in(stype_in)
637            .schema(dbn::Schema::Statistics)
638            .limit(params.limit.and_then(NonZeroU64::new))
639            .build();
640
641        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
642
643        let mut client = self.inner.lock().await;
644        let mut decoder = client
645            .timeseries()
646            .get_range(&range_params)
647            .await
648            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
649
650        let metadata = decoder.metadata().clone();
651        let mut metadata_cache = MetadataCache::new(metadata);
652        let mut result: Vec<DatabentoStatistics> = Vec::new();
653
654        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
655            let record = dbn::RecordRef::from(msg);
656            let sym_map = self
657                .symbol_venue_map
658                .read()
659                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
660            let instrument_id = decode_nautilus_instrument_id(
661                &record,
662                &mut metadata_cache,
663                &self.publisher_venue_map,
664                &sym_map,
665            )?;
666
667            let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
668            result.push(statistics);
669        }
670
671        Ok(result)
672    }
673
674    /// Fetches status data for the given parameters.
675    ///
676    /// # Errors
677    ///
678    /// Returns an error if the API request or data processing fails.
679    pub async fn get_range_status(
680        &self,
681        params: RangeQueryParams,
682    ) -> anyhow::Result<Vec<InstrumentStatus>> {
683        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
684        check_consistent_symbology(&symbols)?;
685
686        let first_symbol = params
687            .symbols
688            .first()
689            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
690        let stype_in = infer_symbology_type(first_symbol);
691        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
692        let time_range = get_date_time_range(params.start, end)?;
693
694        let range_params = GetRangeParams::builder()
695            .dataset(params.dataset)
696            .date_time_range(time_range)
697            .symbols(symbols)
698            .stype_in(stype_in)
699            .schema(dbn::Schema::Status)
700            .limit(params.limit.and_then(NonZeroU64::new))
701            .build();
702
703        let mut client = self.inner.lock().await;
704        let mut decoder = client
705            .timeseries()
706            .get_range(&range_params)
707            .await
708            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
709
710        let metadata = decoder.metadata().clone();
711        let mut metadata_cache = MetadataCache::new(metadata);
712        let mut result: Vec<InstrumentStatus> = Vec::new();
713
714        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
715            let record = dbn::RecordRef::from(msg);
716            let sym_map = self
717                .symbol_venue_map
718                .read()
719                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
720            let instrument_id = decode_nautilus_instrument_id(
721                &record,
722                &mut metadata_cache,
723                &self.publisher_venue_map,
724                &sym_map,
725            )?;
726
727            let status = decode_status_msg(msg, instrument_id, None)?;
728            result.push(status);
729        }
730
731        Ok(result)
732    }
733
734    /// Helper method to prepare symbols from instrument IDs.
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if the symbol venue map lock is poisoned.
739    pub fn prepare_symbols_from_instrument_ids(
740        &self,
741        instrument_ids: &[InstrumentId],
742    ) -> anyhow::Result<Vec<String>> {
743        let mut symbol_venue_map = self
744            .symbol_venue_map
745            .write()
746            .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
747
748        let symbols: Vec<String> = instrument_ids
749            .iter()
750            .map(|instrument_id| {
751                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
752            })
753            .collect();
754
755        Ok(symbols)
756    }
757}