nautilus_databento/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core Databento historical client for both Rust and Python usage.
17
18use std::{
19    fs,
20    num::NonZeroU64,
21    path::PathBuf,
22    str::FromStr,
23    sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28    dbn::{self, decode::DbnMetadata},
29    historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34    data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick},
35    enums::BarAggregation,
36    identifiers::{InstrumentId, Symbol, Venue},
37    instruments::InstrumentAny,
38    types::Currency,
39};
40
41use crate::{
42    common::get_date_time_range,
43    decode::{
44        decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record,
45        decode_statistics_msg, decode_status_msg,
46    },
47    symbology::{
48        MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
49        infer_symbology_type, instrument_id_to_symbol_string,
50    },
51    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
52};
53
54/// Core Databento historical client for fetching historical market data.
55///
56/// This client provides both synchronous and asynchronous interfaces for fetching
57/// various types of historical market data from Databento.
58#[derive(Debug, Clone)]
59pub struct DatabentoHistoricalClient {
60    pub key: String,
61    clock: &'static AtomicTime,
62    inner: Arc<tokio::sync::Mutex<databento::HistoricalClient>>,
63    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
64    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
65    use_exchange_as_venue: bool,
66}
67
68/// Parameters for range queries to Databento historical API.
69#[derive(Debug)]
70pub struct RangeQueryParams {
71    pub dataset: String,
72    pub symbols: Vec<String>,
73    pub start: UnixNanos,
74    pub end: Option<UnixNanos>,
75    pub limit: Option<u64>,
76    pub price_precision: Option<u8>,
77}
78
79/// Result containing dataset date range information.
80#[derive(Debug, Clone)]
81pub struct DatasetRange {
82    pub start: String,
83    pub end: String,
84}
85
86impl DatabentoHistoricalClient {
87    /// Creates a new [`DatabentoHistoricalClient`] instance.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if client creation or publisher loading fails.
92    pub fn new(
93        key: String,
94        publishers_filepath: PathBuf,
95        clock: &'static AtomicTime,
96        use_exchange_as_venue: bool,
97    ) -> anyhow::Result<Self> {
98        let client = databento::HistoricalClient::builder()
99            .user_agent_extension(NAUTILUS_USER_AGENT.into())
100            .key(key.clone())
101            .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
102            .build()
103            .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
104
105        let file_content = fs::read_to_string(publishers_filepath)?;
106        let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
107
108        let publisher_venue_map = publishers_vec
109            .into_iter()
110            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
111            .collect::<IndexMap<u16, Venue>>();
112
113        Ok(Self {
114            clock,
115            inner: Arc::new(tokio::sync::Mutex::new(client)),
116            publisher_venue_map: Arc::new(publisher_venue_map),
117            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
118            key,
119            use_exchange_as_venue,
120        })
121    }
122
123    /// Gets the date range for a specific dataset.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the API request fails.
128    pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
129        let mut client = self.inner.lock().await;
130        let response = client
131            .metadata()
132            .get_dataset_range(dataset)
133            .await
134            .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
135
136        Ok(DatasetRange {
137            start: response.start.to_string(),
138            end: response.end.to_string(),
139        })
140    }
141
142    /// Fetches instrument definitions for the given parameters.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the API request or data processing fails.
147    pub async fn get_range_instruments(
148        &self,
149        params: RangeQueryParams,
150    ) -> anyhow::Result<Vec<InstrumentAny>> {
151        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
152        check_consistent_symbology(&symbols)?;
153
154        let first_symbol = params
155            .symbols
156            .first()
157            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
158        let stype_in = infer_symbology_type(first_symbol);
159        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
160        let time_range = get_date_time_range(params.start, end)?;
161
162        let range_params = GetRangeParams::builder()
163            .dataset(params.dataset)
164            .date_time_range(time_range)
165            .symbols(symbols)
166            .stype_in(stype_in)
167            .schema(dbn::Schema::Definition)
168            .limit(params.limit.and_then(NonZeroU64::new))
169            .build();
170
171        let mut client = self.inner.lock().await;
172        let mut decoder = client
173            .timeseries()
174            .get_range(&range_params)
175            .await
176            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
177
178        let metadata = decoder.metadata().clone();
179        let mut metadata_cache = MetadataCache::new(metadata);
180        let mut instruments = Vec::new();
181
182        while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
183            let record = dbn::RecordRef::from(msg);
184            let sym_map = self
185                .symbol_venue_map
186                .read()
187                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
188            let mut instrument_id = decode_nautilus_instrument_id(
189                &record,
190                &mut metadata_cache,
191                &self.publisher_venue_map,
192                &sym_map,
193            )?;
194
195            if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
196                let exchange = msg
197                    .exchange()
198                    .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
199                let venue = Venue::from_code(exchange)
200                    .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
201                instrument_id.venue = venue;
202            }
203
204            match decode_instrument_def_msg(msg, instrument_id, None) {
205                Ok(instrument) => instruments.push(instrument),
206                Err(e) => log::error!("Failed to decode instrument: {e:?}"),
207            }
208        }
209
210        Ok(instruments)
211    }
212
213    /// Fetches quote ticks for the given parameters.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if the API request or data processing fails.
218    pub async fn get_range_quotes(
219        &self,
220        params: RangeQueryParams,
221        schema: Option<String>,
222    ) -> anyhow::Result<Vec<QuoteTick>> {
223        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
224        check_consistent_symbology(&symbols)?;
225
226        let first_symbol = params
227            .symbols
228            .first()
229            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
230        let stype_in = infer_symbology_type(first_symbol);
231        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
232        let time_range = get_date_time_range(params.start, end)?;
233        let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
234        let dbn_schema = dbn::Schema::from_str(&schema)?;
235
236        match dbn_schema {
237            dbn::Schema::Mbp1
238            | dbn::Schema::Bbo1S
239            | dbn::Schema::Bbo1M
240            | dbn::Schema::Cmbp1
241            | dbn::Schema::Cbbo1S
242            | dbn::Schema::Cbbo1M => (),
243            _ => anyhow::bail!(
244                "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m, cmbp-1, cbbo-1s, cbbo-1m"
245            ),
246        }
247
248        let range_params = GetRangeParams::builder()
249            .dataset(params.dataset)
250            .date_time_range(time_range)
251            .symbols(symbols)
252            .stype_in(stype_in)
253            .schema(dbn_schema)
254            .limit(params.limit.and_then(NonZeroU64::new))
255            .build();
256
257        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
258
259        let mut client = self.inner.lock().await;
260        let mut decoder = client
261            .timeseries()
262            .get_range(&range_params)
263            .await
264            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
265
266        let metadata = decoder.metadata().clone();
267        let mut metadata_cache = MetadataCache::new(metadata);
268        let mut result: Vec<QuoteTick> = Vec::new();
269
270        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
271            let sym_map = self
272                .symbol_venue_map
273                .read()
274                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
275            let instrument_id = decode_nautilus_instrument_id(
276                &record,
277                &mut metadata_cache,
278                &self.publisher_venue_map,
279                &sym_map,
280            )?;
281
282            let (data, _) = decode_record(
283                &record,
284                instrument_id,
285                price_precision,
286                None,
287                false, // Don't include trades
288                true,
289            )?;
290
291            match data {
292                Some(Data::Quote(quote)) => result.push(quote),
293                None => {} // Skip records with undefined bid/ask prices
294                _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
295            }
296            Ok(())
297        };
298
299        match dbn_schema {
300            dbn::Schema::Mbp1 => {
301                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
302                    process_record(dbn::RecordRef::from(msg))?;
303                }
304            }
305            dbn::Schema::Cmbp1 => {
306                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Cmbp1Msg>().await {
307                    process_record(dbn::RecordRef::from(msg))?;
308                }
309            }
310            dbn::Schema::Bbo1M => {
311                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
312                    process_record(dbn::RecordRef::from(msg))?;
313                }
314            }
315            dbn::Schema::Bbo1S => {
316                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
317                    process_record(dbn::RecordRef::from(msg))?;
318                }
319            }
320            dbn::Schema::Cbbo1S | dbn::Schema::Cbbo1M => {
321                while let Ok(Some(msg)) = decoder.decode_record::<dbn::CbboMsg>().await {
322                    process_record(dbn::RecordRef::from(msg))?;
323                }
324            }
325            _ => anyhow::bail!("Invalid schema {dbn_schema}"),
326        }
327
328        Ok(result)
329    }
330
331    /// Fetches order book depth10 snapshots for the given parameters.
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if the API request or data processing fails.
336    pub async fn get_range_order_book_depth10(
337        &self,
338        params: RangeQueryParams,
339        depth: Option<usize>,
340    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
341        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
342        check_consistent_symbology(&symbols)?;
343
344        let first_symbol = params
345            .symbols
346            .first()
347            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
348        let stype_in = infer_symbology_type(first_symbol);
349        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
350        let time_range = get_date_time_range(params.start, end)?;
351
352        // For now, only support MBP_10 schema for depth 10
353        let _depth = depth.unwrap_or(10);
354        if _depth != 10 {
355            anyhow::bail!("Only depth=10 is currently supported for order book depths");
356        }
357
358        let range_params = GetRangeParams::builder()
359            .dataset(params.dataset)
360            .date_time_range(time_range)
361            .symbols(symbols)
362            .stype_in(stype_in)
363            .schema(dbn::Schema::Mbp10)
364            .limit(params.limit.and_then(NonZeroU64::new))
365            .build();
366
367        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
368
369        let mut client = self.inner.lock().await;
370        let mut decoder = client
371            .timeseries()
372            .get_range(&range_params)
373            .await
374            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
375
376        let metadata = decoder.metadata().clone();
377        let mut metadata_cache = MetadataCache::new(metadata);
378        let mut result: Vec<OrderBookDepth10> = Vec::new();
379
380        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
381            let sym_map = self
382                .symbol_venue_map
383                .read()
384                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
385            let instrument_id = decode_nautilus_instrument_id(
386                &record,
387                &mut metadata_cache,
388                &self.publisher_venue_map,
389                &sym_map,
390            )?;
391
392            if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
393                let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
394                result.push(depth);
395            }
396
397            Ok(())
398        };
399
400        while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
401            process_record(dbn::RecordRef::from(msg))?;
402        }
403
404        Ok(result)
405    }
406
407    /// Fetches trade ticks for the given parameters.
408    ///
409    /// # Errors
410    ///
411    /// Returns an error if the API request or data processing fails.
412    pub async fn get_range_trades(
413        &self,
414        params: RangeQueryParams,
415    ) -> anyhow::Result<Vec<TradeTick>> {
416        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
417        check_consistent_symbology(&symbols)?;
418
419        let first_symbol = params
420            .symbols
421            .first()
422            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
423        let stype_in = infer_symbology_type(first_symbol);
424        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
425        let time_range = get_date_time_range(params.start, end)?;
426
427        let range_params = GetRangeParams::builder()
428            .dataset(params.dataset)
429            .date_time_range(time_range)
430            .symbols(symbols)
431            .stype_in(stype_in)
432            .schema(dbn::Schema::Trades)
433            .limit(params.limit.and_then(NonZeroU64::new))
434            .build();
435
436        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
437
438        let mut client = self.inner.lock().await;
439        let mut decoder = client
440            .timeseries()
441            .get_range(&range_params)
442            .await
443            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
444
445        let metadata = decoder.metadata().clone();
446        let mut metadata_cache = MetadataCache::new(metadata);
447        let mut result: Vec<TradeTick> = Vec::new();
448
449        while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
450            let record = dbn::RecordRef::from(msg);
451            let sym_map = self
452                .symbol_venue_map
453                .read()
454                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
455            let instrument_id = decode_nautilus_instrument_id(
456                &record,
457                &mut metadata_cache,
458                &self.publisher_venue_map,
459                &sym_map,
460            )?;
461
462            let (data, _) = decode_record(
463                &record,
464                instrument_id,
465                price_precision,
466                None,
467                false, // Not applicable (trade will be decoded regardless)
468                true,
469            )?;
470
471            match data {
472                Some(Data::Trade(trade)) => {
473                    result.push(trade);
474                }
475                _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
476            }
477        }
478
479        Ok(result)
480    }
481
482    /// Fetches bars for the given parameters.
483    ///
484    /// # Errors
485    ///
486    /// Returns an error if the API request or data processing fails.
487    pub async fn get_range_bars(
488        &self,
489        params: RangeQueryParams,
490        aggregation: BarAggregation,
491        timestamp_on_close: bool,
492    ) -> anyhow::Result<Vec<Bar>> {
493        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
494        check_consistent_symbology(&symbols)?;
495
496        let first_symbol = params
497            .symbols
498            .first()
499            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
500        let stype_in = infer_symbology_type(first_symbol);
501        let schema = match aggregation {
502            BarAggregation::Second => dbn::Schema::Ohlcv1S,
503            BarAggregation::Minute => dbn::Schema::Ohlcv1M,
504            BarAggregation::Hour => dbn::Schema::Ohlcv1H,
505            BarAggregation::Day => dbn::Schema::Ohlcv1D,
506            _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
507        };
508
509        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
510        let time_range = get_date_time_range(params.start, end)?;
511
512        let range_params = GetRangeParams::builder()
513            .dataset(params.dataset)
514            .date_time_range(time_range)
515            .symbols(symbols)
516            .stype_in(stype_in)
517            .schema(schema)
518            .limit(params.limit.and_then(NonZeroU64::new))
519            .build();
520
521        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
522
523        let mut client = self.inner.lock().await;
524        let mut decoder = client
525            .timeseries()
526            .get_range(&range_params)
527            .await
528            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
529
530        let metadata = decoder.metadata().clone();
531        let mut metadata_cache = MetadataCache::new(metadata);
532        let mut result: Vec<Bar> = Vec::new();
533
534        while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
535            let record = dbn::RecordRef::from(msg);
536            let sym_map = self
537                .symbol_venue_map
538                .read()
539                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
540            let instrument_id = decode_nautilus_instrument_id(
541                &record,
542                &mut metadata_cache,
543                &self.publisher_venue_map,
544                &sym_map,
545            )?;
546
547            let (data, _) = decode_record(
548                &record,
549                instrument_id,
550                price_precision,
551                None,
552                false, // Not applicable
553                timestamp_on_close,
554            )?;
555
556            match data {
557                Some(Data::Bar(bar)) => {
558                    result.push(bar);
559                }
560                _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
561            }
562        }
563
564        Ok(result)
565    }
566
567    /// Fetches imbalance data for the given parameters.
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if the API request or data processing fails.
572    pub async fn get_range_imbalance(
573        &self,
574        params: RangeQueryParams,
575    ) -> anyhow::Result<Vec<DatabentoImbalance>> {
576        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
577        check_consistent_symbology(&symbols)?;
578
579        let first_symbol = params
580            .symbols
581            .first()
582            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
583        let stype_in = infer_symbology_type(first_symbol);
584        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
585        let time_range = get_date_time_range(params.start, end)?;
586
587        let range_params = GetRangeParams::builder()
588            .dataset(params.dataset)
589            .date_time_range(time_range)
590            .symbols(symbols)
591            .stype_in(stype_in)
592            .schema(dbn::Schema::Imbalance)
593            .limit(params.limit.and_then(NonZeroU64::new))
594            .build();
595
596        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
597
598        let mut client = self.inner.lock().await;
599        let mut decoder = client
600            .timeseries()
601            .get_range(&range_params)
602            .await
603            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
604
605        let metadata = decoder.metadata().clone();
606        let mut metadata_cache = MetadataCache::new(metadata);
607        let mut result: Vec<DatabentoImbalance> = Vec::new();
608
609        while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
610            let record = dbn::RecordRef::from(msg);
611            let sym_map = self
612                .symbol_venue_map
613                .read()
614                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
615            let instrument_id = decode_nautilus_instrument_id(
616                &record,
617                &mut metadata_cache,
618                &self.publisher_venue_map,
619                &sym_map,
620            )?;
621
622            let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
623            result.push(imbalance);
624        }
625
626        Ok(result)
627    }
628
629    /// Fetches statistics data for the given parameters.
630    ///
631    /// # Errors
632    ///
633    /// Returns an error if the API request or data processing fails.
634    pub async fn get_range_statistics(
635        &self,
636        params: RangeQueryParams,
637    ) -> anyhow::Result<Vec<DatabentoStatistics>> {
638        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
639        check_consistent_symbology(&symbols)?;
640
641        let first_symbol = params
642            .symbols
643            .first()
644            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
645        let stype_in = infer_symbology_type(first_symbol);
646        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
647        let time_range = get_date_time_range(params.start, end)?;
648
649        let range_params = GetRangeParams::builder()
650            .dataset(params.dataset)
651            .date_time_range(time_range)
652            .symbols(symbols)
653            .stype_in(stype_in)
654            .schema(dbn::Schema::Statistics)
655            .limit(params.limit.and_then(NonZeroU64::new))
656            .build();
657
658        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
659
660        let mut client = self.inner.lock().await;
661        let mut decoder = client
662            .timeseries()
663            .get_range(&range_params)
664            .await
665            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
666
667        let metadata = decoder.metadata().clone();
668        let mut metadata_cache = MetadataCache::new(metadata);
669        let mut result: Vec<DatabentoStatistics> = Vec::new();
670
671        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
672            let record = dbn::RecordRef::from(msg);
673            let sym_map = self
674                .symbol_venue_map
675                .read()
676                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
677            let instrument_id = decode_nautilus_instrument_id(
678                &record,
679                &mut metadata_cache,
680                &self.publisher_venue_map,
681                &sym_map,
682            )?;
683
684            let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
685            result.push(statistics);
686        }
687
688        Ok(result)
689    }
690
691    /// Fetches status data for the given parameters.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if the API request or data processing fails.
696    pub async fn get_range_status(
697        &self,
698        params: RangeQueryParams,
699    ) -> anyhow::Result<Vec<InstrumentStatus>> {
700        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
701        check_consistent_symbology(&symbols)?;
702
703        let first_symbol = params
704            .symbols
705            .first()
706            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
707        let stype_in = infer_symbology_type(first_symbol);
708        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
709        let time_range = get_date_time_range(params.start, end)?;
710
711        let range_params = GetRangeParams::builder()
712            .dataset(params.dataset)
713            .date_time_range(time_range)
714            .symbols(symbols)
715            .stype_in(stype_in)
716            .schema(dbn::Schema::Status)
717            .limit(params.limit.and_then(NonZeroU64::new))
718            .build();
719
720        let mut client = self.inner.lock().await;
721        let mut decoder = client
722            .timeseries()
723            .get_range(&range_params)
724            .await
725            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
726
727        let metadata = decoder.metadata().clone();
728        let mut metadata_cache = MetadataCache::new(metadata);
729        let mut result: Vec<InstrumentStatus> = Vec::new();
730
731        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
732            let record = dbn::RecordRef::from(msg);
733            let sym_map = self
734                .symbol_venue_map
735                .read()
736                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
737            let instrument_id = decode_nautilus_instrument_id(
738                &record,
739                &mut metadata_cache,
740                &self.publisher_venue_map,
741                &sym_map,
742            )?;
743
744            let status = decode_status_msg(msg, instrument_id, None)?;
745            result.push(status);
746        }
747
748        Ok(result)
749    }
750
751    /// Helper method to prepare symbols from instrument IDs.
752    ///
753    /// # Errors
754    ///
755    /// Returns an error if the symbol venue map lock is poisoned.
756    pub fn prepare_symbols_from_instrument_ids(
757        &self,
758        instrument_ids: &[InstrumentId],
759    ) -> anyhow::Result<Vec<String>> {
760        let mut symbol_venue_map = self
761            .symbol_venue_map
762            .write()
763            .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
764
765        let symbols: Vec<String> = instrument_ids
766            .iter()
767            .map(|instrument_id| {
768                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
769            })
770            .collect();
771
772        Ok(symbols)
773    }
774}