Skip to main content

nautilus_databento/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core Databento historical client for both Rust and Python usage.
17
18use std::{
19    fs,
20    num::NonZeroU64,
21    path::PathBuf,
22    str::FromStr,
23    sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28    dbn::{self, decode::DbnMetadata},
29    historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
35    enums::BarAggregation,
36    identifiers::{InstrumentId, Symbol, Venue},
37    instruments::InstrumentAny,
38    types::Currency,
39};
40
41use crate::{
42    common::get_date_time_range,
43    decode::{
44        decode_imbalance_msg, decode_instrument_def_msg, decode_mbo_msg, decode_mbp10_msg,
45        decode_record, decode_statistics_msg, decode_status_msg,
46    },
47    symbology::{
48        MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
49        infer_symbology_type, instrument_id_to_symbol_string,
50    },
51    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
52};
53
54/// Core Databento historical client for fetching historical market data.
55///
56/// This client provides both synchronous and asynchronous interfaces for fetching
57/// various types of historical market data from Databento.
58#[derive(Debug, Clone)]
59pub struct DatabentoHistoricalClient {
60    pub key: String,
61    clock: &'static AtomicTime,
62    inner: Arc<tokio::sync::Mutex<databento::HistoricalClient>>,
63    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
64    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
65    use_exchange_as_venue: bool,
66}
67
68/// Parameters for range queries to Databento historical API.
69#[derive(Debug)]
70pub struct RangeQueryParams {
71    pub dataset: String,
72    pub symbols: Vec<String>,
73    pub start: UnixNanos,
74    pub end: Option<UnixNanos>,
75    pub limit: Option<u64>,
76    pub price_precision: Option<u8>,
77}
78
79/// Result containing dataset date range information.
80#[derive(Debug, Clone)]
81pub struct DatasetRange {
82    pub start: String,
83    pub end: String,
84}
85
86impl DatabentoHistoricalClient {
87    /// Creates a new [`DatabentoHistoricalClient`] instance.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if client creation or publisher loading fails.
92    pub fn new(
93        key: String,
94        publishers_filepath: PathBuf,
95        clock: &'static AtomicTime,
96        use_exchange_as_venue: bool,
97    ) -> anyhow::Result<Self> {
98        let client = databento::HistoricalClient::builder()
99            .user_agent_extension(NAUTILUS_USER_AGENT.into())
100            .key(key.clone())
101            .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
102            .build()
103            .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
104
105        let file_content = fs::read_to_string(publishers_filepath)?;
106        let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
107
108        let publisher_venue_map = publishers_vec
109            .into_iter()
110            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
111            .collect::<IndexMap<u16, Venue>>();
112
113        Ok(Self {
114            clock,
115            inner: Arc::new(tokio::sync::Mutex::new(client)),
116            publisher_venue_map: Arc::new(publisher_venue_map),
117            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
118            key,
119            use_exchange_as_venue,
120        })
121    }
122
123    /// Gets the date range for a specific dataset.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the API request fails.
128    pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
129        let mut client = self.inner.lock().await;
130        let response = client
131            .metadata()
132            .get_dataset_range(dataset)
133            .await
134            .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
135
136        Ok(DatasetRange {
137            start: response.start.to_string(),
138            end: response.end.to_string(),
139        })
140    }
141
142    /// Fetches instrument definitions for the given parameters.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the API request or data processing fails.
147    pub async fn get_range_instruments(
148        &self,
149        params: RangeQueryParams,
150    ) -> anyhow::Result<Vec<InstrumentAny>> {
151        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
152        check_consistent_symbology(&symbols)?;
153
154        let first_symbol = params
155            .symbols
156            .first()
157            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
158        let stype_in = infer_symbology_type(first_symbol);
159        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
160        let time_range = get_date_time_range(params.start, end)?;
161
162        let range_params = GetRangeParams::builder()
163            .dataset(params.dataset)
164            .date_time_range(time_range)
165            .symbols(symbols)
166            .stype_in(stype_in)
167            .schema(dbn::Schema::Definition)
168            .limit(params.limit.and_then(NonZeroU64::new))
169            .build();
170
171        let mut client = self.inner.lock().await;
172        let mut decoder = client
173            .timeseries()
174            .get_range(&range_params)
175            .await
176            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
177
178        let metadata = decoder.metadata().clone();
179        let mut metadata_cache = MetadataCache::new(metadata);
180        let mut instruments = Vec::new();
181
182        while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
183            let record = dbn::RecordRef::from(msg);
184            let sym_map = self
185                .symbol_venue_map
186                .read()
187                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
188            let mut instrument_id = decode_nautilus_instrument_id(
189                &record,
190                &mut metadata_cache,
191                &self.publisher_venue_map,
192                &sym_map,
193            )?;
194
195            if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
196                let exchange = msg
197                    .exchange()
198                    .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
199                let venue = Venue::from_code(exchange)
200                    .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
201                instrument_id.venue = venue;
202            }
203
204            match decode_instrument_def_msg(msg, instrument_id, None) {
205                Ok(instrument) => instruments.push(instrument),
206                Err(e) => log::error!("Failed to decode instrument: {e:?}"),
207            }
208        }
209
210        Ok(instruments)
211    }
212
213    /// Fetches quote ticks for the given parameters.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if the API request or data processing fails.
218    pub async fn get_range_quotes(
219        &self,
220        params: RangeQueryParams,
221        schema: Option<String>,
222    ) -> anyhow::Result<Vec<QuoteTick>> {
223        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
224        check_consistent_symbology(&symbols)?;
225
226        let first_symbol = params
227            .symbols
228            .first()
229            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
230        let stype_in = infer_symbology_type(first_symbol);
231        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
232        let time_range = get_date_time_range(params.start, end)?;
233        let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
234        let dbn_schema = dbn::Schema::from_str(&schema)?;
235
236        match dbn_schema {
237            dbn::Schema::Mbp1
238            | dbn::Schema::Bbo1S
239            | dbn::Schema::Bbo1M
240            | dbn::Schema::Cmbp1
241            | dbn::Schema::Cbbo1S
242            | dbn::Schema::Cbbo1M => (),
243            _ => anyhow::bail!(
244                "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m, cmbp-1, cbbo-1s, cbbo-1m"
245            ),
246        }
247
248        let range_params = GetRangeParams::builder()
249            .dataset(params.dataset)
250            .date_time_range(time_range)
251            .symbols(symbols)
252            .stype_in(stype_in)
253            .schema(dbn_schema)
254            .limit(params.limit.and_then(NonZeroU64::new))
255            .build();
256
257        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
258
259        let mut client = self.inner.lock().await;
260        let mut decoder = client
261            .timeseries()
262            .get_range(&range_params)
263            .await
264            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
265
266        let metadata = decoder.metadata().clone();
267        let mut metadata_cache = MetadataCache::new(metadata);
268        let mut result: Vec<QuoteTick> = Vec::new();
269
270        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
271            let sym_map = self
272                .symbol_venue_map
273                .read()
274                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
275            let instrument_id = decode_nautilus_instrument_id(
276                &record,
277                &mut metadata_cache,
278                &self.publisher_venue_map,
279                &sym_map,
280            )?;
281
282            let (data, _) = decode_record(
283                &record,
284                instrument_id,
285                price_precision,
286                None,
287                false, // Don't include trades
288                true,
289            )?;
290
291            match data {
292                Some(Data::Quote(quote)) => result.push(quote),
293                None => {} // Skip records with undefined bid/ask prices
294                _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
295            }
296            Ok(())
297        };
298
299        match dbn_schema {
300            dbn::Schema::Mbp1 => {
301                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
302                    process_record(dbn::RecordRef::from(msg))?;
303                }
304            }
305            dbn::Schema::Cmbp1 => {
306                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Cmbp1Msg>().await {
307                    process_record(dbn::RecordRef::from(msg))?;
308                }
309            }
310            dbn::Schema::Bbo1M => {
311                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
312                    process_record(dbn::RecordRef::from(msg))?;
313                }
314            }
315            dbn::Schema::Bbo1S => {
316                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
317                    process_record(dbn::RecordRef::from(msg))?;
318                }
319            }
320            dbn::Schema::Cbbo1S | dbn::Schema::Cbbo1M => {
321                while let Ok(Some(msg)) = decoder.decode_record::<dbn::CbboMsg>().await {
322                    process_record(dbn::RecordRef::from(msg))?;
323                }
324            }
325            _ => anyhow::bail!("Invalid schema {dbn_schema}"),
326        }
327
328        Ok(result)
329    }
330
331    /// Fetches order book depth10 snapshots for the given parameters.
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if the API request or data processing fails.
336    pub async fn get_range_order_book_depth10(
337        &self,
338        params: RangeQueryParams,
339        depth: Option<usize>,
340    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
341        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
342        check_consistent_symbology(&symbols)?;
343
344        let first_symbol = params
345            .symbols
346            .first()
347            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
348        let stype_in = infer_symbology_type(first_symbol);
349        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
350        let time_range = get_date_time_range(params.start, end)?;
351
352        // For now, only support MBP_10 schema for depth 10
353        let _depth = depth.unwrap_or(10);
354        if _depth != 10 {
355            anyhow::bail!("Only depth=10 is currently supported for order book depths");
356        }
357
358        let range_params = GetRangeParams::builder()
359            .dataset(params.dataset)
360            .date_time_range(time_range)
361            .symbols(symbols)
362            .stype_in(stype_in)
363            .schema(dbn::Schema::Mbp10)
364            .limit(params.limit.and_then(NonZeroU64::new))
365            .build();
366
367        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
368
369        let mut client = self.inner.lock().await;
370        let mut decoder = client
371            .timeseries()
372            .get_range(&range_params)
373            .await
374            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
375
376        let metadata = decoder.metadata().clone();
377        let mut metadata_cache = MetadataCache::new(metadata);
378        let mut result: Vec<OrderBookDepth10> = Vec::new();
379
380        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
381            let sym_map = self
382                .symbol_venue_map
383                .read()
384                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
385            let instrument_id = decode_nautilus_instrument_id(
386                &record,
387                &mut metadata_cache,
388                &self.publisher_venue_map,
389                &sym_map,
390            )?;
391
392            if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
393                let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
394                result.push(depth);
395            }
396
397            Ok(())
398        };
399
400        while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
401            process_record(dbn::RecordRef::from(msg))?;
402        }
403
404        Ok(result)
405    }
406
407    /// Fetches order book deltas for the given parameters.
408    ///
409    /// # Errors
410    ///
411    /// Returns an error if the API request or data processing fails.
412    pub async fn get_range_order_book_deltas(
413        &self,
414        params: RangeQueryParams,
415    ) -> anyhow::Result<Vec<OrderBookDelta>> {
416        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
417        check_consistent_symbology(&symbols)?;
418
419        let first_symbol = params
420            .symbols
421            .first()
422            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
423        let stype_in = infer_symbology_type(first_symbol);
424        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
425        let time_range = get_date_time_range(params.start, end)?;
426
427        let range_params = GetRangeParams::builder()
428            .dataset(params.dataset)
429            .date_time_range(time_range)
430            .symbols(symbols)
431            .stype_in(stype_in)
432            .schema(dbn::Schema::Mbo)
433            .limit(params.limit.and_then(NonZeroU64::new))
434            .build();
435
436        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
437
438        let mut client = self.inner.lock().await;
439        let mut decoder = client
440            .timeseries()
441            .get_range(&range_params)
442            .await
443            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
444
445        let metadata = decoder.metadata().clone();
446        let mut metadata_cache = MetadataCache::new(metadata);
447        let mut result: Vec<OrderBookDelta> = Vec::new();
448
449        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
450            let sym_map = self
451                .symbol_venue_map
452                .read()
453                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
454            let instrument_id = decode_nautilus_instrument_id(
455                &record,
456                &mut metadata_cache,
457                &self.publisher_venue_map,
458                &sym_map,
459            )?;
460
461            if let Some(msg) = record.get::<dbn::MboMsg>() {
462                let (delta, _trade) =
463                    decode_mbo_msg(msg, instrument_id, price_precision, None, false)?;
464                if let Some(delta) = delta {
465                    result.push(delta);
466                }
467            }
468
469            Ok(())
470        };
471
472        while let Ok(Some(msg)) = decoder.decode_record::<dbn::MboMsg>().await {
473            process_record(dbn::RecordRef::from(msg))?;
474        }
475
476        Ok(result)
477    }
478
479    /// Fetches trade ticks for the given parameters.
480    ///
481    /// # Errors
482    ///
483    /// Returns an error if the API request or data processing fails.
484    pub async fn get_range_trades(
485        &self,
486        params: RangeQueryParams,
487    ) -> anyhow::Result<Vec<TradeTick>> {
488        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
489        check_consistent_symbology(&symbols)?;
490
491        let first_symbol = params
492            .symbols
493            .first()
494            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
495        let stype_in = infer_symbology_type(first_symbol);
496        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
497        let time_range = get_date_time_range(params.start, end)?;
498
499        let range_params = GetRangeParams::builder()
500            .dataset(params.dataset)
501            .date_time_range(time_range)
502            .symbols(symbols)
503            .stype_in(stype_in)
504            .schema(dbn::Schema::Trades)
505            .limit(params.limit.and_then(NonZeroU64::new))
506            .build();
507
508        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
509
510        let mut client = self.inner.lock().await;
511        let mut decoder = client
512            .timeseries()
513            .get_range(&range_params)
514            .await
515            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
516
517        let metadata = decoder.metadata().clone();
518        let mut metadata_cache = MetadataCache::new(metadata);
519        let mut result: Vec<TradeTick> = Vec::new();
520
521        while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
522            let record = dbn::RecordRef::from(msg);
523            let sym_map = self
524                .symbol_venue_map
525                .read()
526                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
527            let instrument_id = decode_nautilus_instrument_id(
528                &record,
529                &mut metadata_cache,
530                &self.publisher_venue_map,
531                &sym_map,
532            )?;
533
534            let (data, _) = decode_record(
535                &record,
536                instrument_id,
537                price_precision,
538                None,
539                false, // Not applicable (trade will be decoded regardless)
540                true,
541            )?;
542
543            match data {
544                Some(Data::Trade(trade)) => {
545                    result.push(trade);
546                }
547                _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
548            }
549        }
550
551        Ok(result)
552    }
553
554    /// Fetches bars for the given parameters.
555    ///
556    /// # Errors
557    ///
558    /// Returns an error if the API request or data processing fails.
559    pub async fn get_range_bars(
560        &self,
561        params: RangeQueryParams,
562        aggregation: BarAggregation,
563        timestamp_on_close: bool,
564    ) -> anyhow::Result<Vec<Bar>> {
565        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
566        check_consistent_symbology(&symbols)?;
567
568        let first_symbol = params
569            .symbols
570            .first()
571            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
572        let stype_in = infer_symbology_type(first_symbol);
573        let schema = match aggregation {
574            BarAggregation::Second => dbn::Schema::Ohlcv1S,
575            BarAggregation::Minute => dbn::Schema::Ohlcv1M,
576            BarAggregation::Hour => dbn::Schema::Ohlcv1H,
577            BarAggregation::Day => dbn::Schema::Ohlcv1D,
578            _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
579        };
580
581        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
582        let time_range = get_date_time_range(params.start, end)?;
583
584        let range_params = GetRangeParams::builder()
585            .dataset(params.dataset)
586            .date_time_range(time_range)
587            .symbols(symbols)
588            .stype_in(stype_in)
589            .schema(schema)
590            .limit(params.limit.and_then(NonZeroU64::new))
591            .build();
592
593        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
594
595        let mut client = self.inner.lock().await;
596        let mut decoder = client
597            .timeseries()
598            .get_range(&range_params)
599            .await
600            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
601
602        let metadata = decoder.metadata().clone();
603        let mut metadata_cache = MetadataCache::new(metadata);
604        let mut result: Vec<Bar> = Vec::new();
605
606        while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
607            let record = dbn::RecordRef::from(msg);
608            let sym_map = self
609                .symbol_venue_map
610                .read()
611                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
612            let instrument_id = decode_nautilus_instrument_id(
613                &record,
614                &mut metadata_cache,
615                &self.publisher_venue_map,
616                &sym_map,
617            )?;
618
619            let (data, _) = decode_record(
620                &record,
621                instrument_id,
622                price_precision,
623                None,
624                false, // Not applicable
625                timestamp_on_close,
626            )?;
627
628            match data {
629                Some(Data::Bar(bar)) => {
630                    result.push(bar);
631                }
632                _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
633            }
634        }
635
636        Ok(result)
637    }
638
639    /// Fetches imbalance data for the given parameters.
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if the API request or data processing fails.
644    pub async fn get_range_imbalance(
645        &self,
646        params: RangeQueryParams,
647    ) -> anyhow::Result<Vec<DatabentoImbalance>> {
648        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
649        check_consistent_symbology(&symbols)?;
650
651        let first_symbol = params
652            .symbols
653            .first()
654            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
655        let stype_in = infer_symbology_type(first_symbol);
656        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
657        let time_range = get_date_time_range(params.start, end)?;
658
659        let range_params = GetRangeParams::builder()
660            .dataset(params.dataset)
661            .date_time_range(time_range)
662            .symbols(symbols)
663            .stype_in(stype_in)
664            .schema(dbn::Schema::Imbalance)
665            .limit(params.limit.and_then(NonZeroU64::new))
666            .build();
667
668        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
669
670        let mut client = self.inner.lock().await;
671        let mut decoder = client
672            .timeseries()
673            .get_range(&range_params)
674            .await
675            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
676
677        let metadata = decoder.metadata().clone();
678        let mut metadata_cache = MetadataCache::new(metadata);
679        let mut result: Vec<DatabentoImbalance> = Vec::new();
680
681        while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
682            let record = dbn::RecordRef::from(msg);
683            let sym_map = self
684                .symbol_venue_map
685                .read()
686                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
687            let instrument_id = decode_nautilus_instrument_id(
688                &record,
689                &mut metadata_cache,
690                &self.publisher_venue_map,
691                &sym_map,
692            )?;
693
694            let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
695            result.push(imbalance);
696        }
697
698        Ok(result)
699    }
700
701    /// Fetches statistics data for the given parameters.
702    ///
703    /// # Errors
704    ///
705    /// Returns an error if the API request or data processing fails.
706    pub async fn get_range_statistics(
707        &self,
708        params: RangeQueryParams,
709    ) -> anyhow::Result<Vec<DatabentoStatistics>> {
710        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
711        check_consistent_symbology(&symbols)?;
712
713        let first_symbol = params
714            .symbols
715            .first()
716            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
717        let stype_in = infer_symbology_type(first_symbol);
718        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
719        let time_range = get_date_time_range(params.start, end)?;
720
721        let range_params = GetRangeParams::builder()
722            .dataset(params.dataset)
723            .date_time_range(time_range)
724            .symbols(symbols)
725            .stype_in(stype_in)
726            .schema(dbn::Schema::Statistics)
727            .limit(params.limit.and_then(NonZeroU64::new))
728            .build();
729
730        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
731
732        let mut client = self.inner.lock().await;
733        let mut decoder = client
734            .timeseries()
735            .get_range(&range_params)
736            .await
737            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
738
739        let metadata = decoder.metadata().clone();
740        let mut metadata_cache = MetadataCache::new(metadata);
741        let mut result: Vec<DatabentoStatistics> = Vec::new();
742
743        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
744            let record = dbn::RecordRef::from(msg);
745            let sym_map = self
746                .symbol_venue_map
747                .read()
748                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
749            let instrument_id = decode_nautilus_instrument_id(
750                &record,
751                &mut metadata_cache,
752                &self.publisher_venue_map,
753                &sym_map,
754            )?;
755
756            let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
757            result.push(statistics);
758        }
759
760        Ok(result)
761    }
762
763    /// Fetches status data for the given parameters.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if the API request or data processing fails.
768    pub async fn get_range_status(
769        &self,
770        params: RangeQueryParams,
771    ) -> anyhow::Result<Vec<InstrumentStatus>> {
772        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
773        check_consistent_symbology(&symbols)?;
774
775        let first_symbol = params
776            .symbols
777            .first()
778            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
779        let stype_in = infer_symbology_type(first_symbol);
780        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
781        let time_range = get_date_time_range(params.start, end)?;
782
783        let range_params = GetRangeParams::builder()
784            .dataset(params.dataset)
785            .date_time_range(time_range)
786            .symbols(symbols)
787            .stype_in(stype_in)
788            .schema(dbn::Schema::Status)
789            .limit(params.limit.and_then(NonZeroU64::new))
790            .build();
791
792        let mut client = self.inner.lock().await;
793        let mut decoder = client
794            .timeseries()
795            .get_range(&range_params)
796            .await
797            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
798
799        let metadata = decoder.metadata().clone();
800        let mut metadata_cache = MetadataCache::new(metadata);
801        let mut result: Vec<InstrumentStatus> = Vec::new();
802
803        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
804            let record = dbn::RecordRef::from(msg);
805            let sym_map = self
806                .symbol_venue_map
807                .read()
808                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
809            let instrument_id = decode_nautilus_instrument_id(
810                &record,
811                &mut metadata_cache,
812                &self.publisher_venue_map,
813                &sym_map,
814            )?;
815
816            let status = decode_status_msg(msg, instrument_id, None)?;
817            result.push(status);
818        }
819
820        Ok(result)
821    }
822
823    /// Helper method to prepare symbols from instrument IDs.
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if the symbol venue map lock is poisoned.
828    pub fn prepare_symbols_from_instrument_ids(
829        &self,
830        instrument_ids: &[InstrumentId],
831    ) -> anyhow::Result<Vec<String>> {
832        let mut symbol_venue_map = self
833            .symbol_venue_map
834            .write()
835            .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
836
837        let symbols: Vec<String> = instrument_ids
838            .iter()
839            .map(|instrument_id| {
840                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
841            })
842            .collect();
843
844        Ok(symbols)
845    }
846}