nautilus_databento/python/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fs,
19    num::NonZeroU64,
20    path::PathBuf,
21    str::FromStr,
22    sync::{Arc, RwLock},
23};
24
25use databento::{
26    dbn::{self},
27    historical::timeseries::GetRangeParams,
28};
29use indexmap::IndexMap;
30use nautilus_core::{
31    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
32    time::{AtomicTime, get_atomic_clock_realtime},
33};
34use nautilus_model::{
35    data::{Bar, Data, InstrumentStatus, QuoteTick, TradeTick},
36    enums::BarAggregation,
37    identifiers::{InstrumentId, Symbol, Venue},
38    python::instruments::instrument_any_to_pyobject,
39    types::Currency,
40};
41use pyo3::{
42    IntoPyObjectExt,
43    exceptions::PyException,
44    prelude::*,
45    types::{PyDict, PyList},
46};
47use tokio::sync::Mutex;
48
49use crate::{
50    common::get_date_time_range,
51    decode::{
52        decode_imbalance_msg, decode_instrument_def_msg, decode_record, decode_statistics_msg,
53        decode_status_msg,
54    },
55    symbology::{
56        MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
57        infer_symbology_type, instrument_id_to_symbol_string,
58    },
59    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
60};
61
62#[cfg_attr(
63    feature = "python",
64    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
65)]
66#[derive(Debug)]
67pub struct DatabentoHistoricalClient {
68    #[pyo3(get)]
69    pub key: String,
70    clock: &'static AtomicTime,
71    inner: Arc<Mutex<databento::HistoricalClient>>,
72    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
73    symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
74    use_exchange_as_venue: bool,
75}
76
77#[pymethods]
78impl DatabentoHistoricalClient {
79    #[new]
80    fn py_new(
81        key: String,
82        publishers_filepath: PathBuf,
83        use_exchange_as_venue: bool,
84    ) -> PyResult<Self> {
85        let client = databento::HistoricalClient::builder()
86            .key(key.clone())
87            .map_err(to_pyvalue_err)?
88            .build()
89            .map_err(to_pyvalue_err)?;
90
91        let file_content = fs::read_to_string(publishers_filepath)?;
92        let publishers_vec: Vec<DatabentoPublisher> =
93            serde_json::from_str(&file_content).map_err(to_pyvalue_err)?;
94
95        let publisher_venue_map = publishers_vec
96            .into_iter()
97            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
98            .collect::<IndexMap<u16, Venue>>();
99
100        Ok(Self {
101            clock: get_atomic_clock_realtime(),
102            inner: Arc::new(Mutex::new(client)),
103            publisher_venue_map: Arc::new(publisher_venue_map),
104            symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
105            key,
106            use_exchange_as_venue,
107        })
108    }
109
110    #[pyo3(name = "get_dataset_range")]
111    fn py_get_dataset_range<'py>(
112        &self,
113        py: Python<'py>,
114        dataset: String,
115    ) -> PyResult<Bound<'py, PyAny>> {
116        let client = self.inner.clone();
117
118        pyo3_async_runtimes::tokio::future_into_py(py, async move {
119            let mut client = client.lock().await; // TODO: Use a client pool
120            let response = client.metadata().get_dataset_range(&dataset).await;
121            match response {
122                Ok(res) => Python::with_gil(|py| {
123                    let dict = PyDict::new(py);
124                    dict.set_item("start", res.start.to_string())?;
125                    dict.set_item("end", res.end.to_string())?;
126                    dict.into_py_any(py)
127                }),
128                Err(e) => Err(PyErr::new::<PyException, _>(format!(
129                    "Error handling response: {e}"
130                ))),
131            }
132        })
133    }
134
135    #[pyo3(name = "get_range_instruments")]
136    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
137    #[allow(clippy::too_many_arguments)]
138    fn py_get_range_instruments<'py>(
139        &self,
140        py: Python<'py>,
141        dataset: String,
142        instrument_ids: Vec<InstrumentId>,
143        start: u64,
144        end: Option<u64>,
145        limit: Option<u64>,
146    ) -> PyResult<Bound<'py, PyAny>> {
147        let client = self.inner.clone();
148        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
149        let symbols: Vec<String> = instrument_ids
150            .iter()
151            .map(|instrument_id| {
152                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
153            })
154            .collect();
155
156        let stype_in = infer_symbology_type(symbols.first().unwrap());
157        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
158        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
159        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
160        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
161        let params = GetRangeParams::builder()
162            .dataset(dataset)
163            .date_time_range(time_range)
164            .symbols(symbols)
165            .stype_in(stype_in)
166            .schema(dbn::Schema::Definition)
167            .limit(limit.and_then(NonZeroU64::new))
168            .build();
169
170        let publisher_venue_map = self.publisher_venue_map.clone();
171        let symbol_venue_map = self.symbol_venue_map.clone();
172        let ts_init = self.clock.get_time_ns();
173        let use_exchange_as_venue = self.use_exchange_as_venue;
174
175        pyo3_async_runtimes::tokio::future_into_py(py, async move {
176            let mut client = client.lock().await; // TODO: Use a client pool
177            let mut decoder = client
178                .timeseries()
179                .get_range(&params)
180                .await
181                .map_err(to_pyvalue_err)?;
182
183            decoder.set_upgrade_policy(dbn::VersionUpgradePolicy::UpgradeToV2);
184
185            let metadata = decoder.metadata().clone();
186            let mut metadata_cache = MetadataCache::new(metadata);
187            let mut instruments = Vec::new();
188
189            while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
190                let record = dbn::RecordRef::from(msg);
191                let mut instrument_id = decode_nautilus_instrument_id(
192                    &record,
193                    &mut metadata_cache,
194                    &publisher_venue_map,
195                    &symbol_venue_map.read().unwrap(),
196                )
197                .map_err(to_pyvalue_err)?;
198
199                if use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
200                    let exchange = msg.exchange().unwrap();
201                    let venue = Venue::from_code(exchange)
202                        .unwrap_or_else(|_| panic!("`Venue` not found for exchange {exchange}"));
203                    instrument_id.venue = venue;
204                }
205
206                let result = decode_instrument_def_msg(msg, instrument_id, ts_init);
207                match result {
208                    Ok(instrument) => instruments.push(instrument),
209                    Err(e) => tracing::error!("{e:?}"),
210                }
211            }
212
213            Python::with_gil(|py| {
214                let py_results: PyResult<Vec<PyObject>> = instruments
215                    .into_iter()
216                    .map(|result| instrument_any_to_pyobject(py, result))
217                    .collect();
218
219                py_results.map(|objs| {
220                    PyList::new(py, &objs)
221                        .expect("Invalid `ExactSizeIterator`")
222                        .into_py_any_unwrap(py)
223                })
224            })
225        })
226    }
227
228    #[pyo3(name = "get_range_quotes")]
229    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
230    #[allow(clippy::too_many_arguments)]
231    fn py_get_range_quotes<'py>(
232        &self,
233        py: Python<'py>,
234        dataset: String,
235        instrument_ids: Vec<InstrumentId>,
236        start: u64,
237        end: Option<u64>,
238        limit: Option<u64>,
239        price_precision: Option<u8>,
240        schema: Option<String>,
241    ) -> PyResult<Bound<'py, PyAny>> {
242        let client = self.inner.clone();
243        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
244        let symbols: Vec<String> = instrument_ids
245            .iter()
246            .map(|instrument_id| {
247                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
248            })
249            .collect();
250
251        let stype_in = infer_symbology_type(symbols.first().unwrap());
252        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
253        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
254        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
255        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
256        let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
257        let dbn_schema = dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?;
258        match dbn_schema {
259            dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
260            _ => {
261                return Err(to_pyvalue_err(
262                    "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m",
263                ));
264            }
265        }
266        let params = GetRangeParams::builder()
267            .dataset(dataset)
268            .date_time_range(time_range)
269            .symbols(symbols)
270            .stype_in(stype_in)
271            .schema(dbn_schema)
272            .limit(limit.and_then(NonZeroU64::new))
273            .build();
274
275        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
276        let publisher_venue_map = self.publisher_venue_map.clone();
277        let symbol_venue_map = self.symbol_venue_map.clone();
278        let ts_init = self.clock.get_time_ns();
279
280        pyo3_async_runtimes::tokio::future_into_py(py, async move {
281            let mut client = client.lock().await; // TODO: Use a client pool
282            let mut decoder = client
283                .timeseries()
284                .get_range(&params)
285                .await
286                .map_err(to_pyvalue_err)?;
287
288            let metadata = decoder.metadata().clone();
289            let mut metadata_cache = MetadataCache::new(metadata);
290            let mut result: Vec<QuoteTick> = Vec::new();
291
292            let mut process_record = |record: dbn::RecordRef| -> PyResult<()> {
293                let instrument_id = decode_nautilus_instrument_id(
294                    &record,
295                    &mut metadata_cache,
296                    &publisher_venue_map,
297                    &symbol_venue_map.read().unwrap(),
298                )
299                .map_err(to_pyvalue_err)?;
300
301                let (data, _) = decode_record(
302                    &record,
303                    instrument_id,
304                    price_precision,
305                    Some(ts_init),
306                    false, // Don't include trades
307                )
308                .map_err(to_pyvalue_err)?;
309
310                match data {
311                    Some(Data::Quote(quote)) => {
312                        result.push(quote);
313                        Ok(())
314                    }
315                    _ => panic!("Invalid data element not `QuoteTick`, was {data:?}"),
316                }
317            };
318
319            match dbn_schema {
320                dbn::Schema::Mbp1 => {
321                    while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
322                        process_record(dbn::RecordRef::from(msg))?;
323                    }
324                }
325                dbn::Schema::Bbo1M => {
326                    while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
327                        process_record(dbn::RecordRef::from(msg))?;
328                    }
329                }
330                dbn::Schema::Bbo1S => {
331                    while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
332                        process_record(dbn::RecordRef::from(msg))?;
333                    }
334                }
335                _ => panic!("Invalid schema {dbn_schema}"),
336            }
337
338            Python::with_gil(|py| result.into_py_any(py))
339        })
340    }
341
342    #[pyo3(name = "get_range_trades")]
343    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
344    #[allow(clippy::too_many_arguments)]
345    fn py_get_range_trades<'py>(
346        &self,
347        py: Python<'py>,
348        dataset: String,
349        instrument_ids: Vec<InstrumentId>,
350        start: u64,
351        end: Option<u64>,
352        limit: Option<u64>,
353        price_precision: Option<u8>,
354    ) -> PyResult<Bound<'py, PyAny>> {
355        let client = self.inner.clone();
356        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
357        let symbols: Vec<String> = instrument_ids
358            .iter()
359            .map(|instrument_id| {
360                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
361            })
362            .collect();
363
364        let stype_in = infer_symbology_type(symbols.first().unwrap());
365        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
366        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
367        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
368        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
369        let params = GetRangeParams::builder()
370            .dataset(dataset)
371            .date_time_range(time_range)
372            .symbols(symbols)
373            .stype_in(stype_in)
374            .schema(dbn::Schema::Trades)
375            .limit(limit.and_then(NonZeroU64::new))
376            .build();
377
378        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
379        let publisher_venue_map = self.publisher_venue_map.clone();
380        let symbol_venue_map = self.symbol_venue_map.clone();
381        let ts_init = self.clock.get_time_ns();
382
383        pyo3_async_runtimes::tokio::future_into_py(py, async move {
384            let mut client = client.lock().await; // TODO: Use a client pool
385            let mut decoder = client
386                .timeseries()
387                .get_range(&params)
388                .await
389                .map_err(to_pyvalue_err)?;
390
391            let metadata = decoder.metadata().clone();
392            let mut metadata_cache = MetadataCache::new(metadata);
393            let mut result: Vec<TradeTick> = Vec::new();
394
395            while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
396                let record = dbn::RecordRef::from(msg);
397                let instrument_id = decode_nautilus_instrument_id(
398                    &record,
399                    &mut metadata_cache,
400                    &publisher_venue_map,
401                    &symbol_venue_map.read().unwrap(),
402                )
403                .map_err(to_pyvalue_err)?;
404
405                let (data, _) = decode_record(
406                    &record,
407                    instrument_id,
408                    price_precision,
409                    Some(ts_init),
410                    false, // Not applicable (trade will be decoded regardless)
411                )
412                .map_err(to_pyvalue_err)?;
413
414                match data {
415                    Some(Data::Trade(trade)) => {
416                        result.push(trade);
417                    }
418                    _ => panic!("Invalid data element not `TradeTick`, was {data:?}"),
419                }
420            }
421
422            Python::with_gil(|py| result.into_py_any(py))
423        })
424    }
425
426    #[pyo3(name = "get_range_bars")]
427    #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None))]
428    #[allow(clippy::too_many_arguments)]
429    fn py_get_range_bars<'py>(
430        &self,
431        py: Python<'py>,
432        dataset: String,
433        instrument_ids: Vec<InstrumentId>,
434        aggregation: BarAggregation,
435        start: u64,
436        end: Option<u64>,
437        limit: Option<u64>,
438        price_precision: Option<u8>,
439    ) -> PyResult<Bound<'py, PyAny>> {
440        let client = self.inner.clone();
441        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
442        let symbols: Vec<String> = instrument_ids
443            .iter()
444            .map(|instrument_id| {
445                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
446            })
447            .collect();
448
449        let stype_in = infer_symbology_type(symbols.first().unwrap());
450        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
451        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
452        let schema = match aggregation {
453            BarAggregation::Second => dbn::Schema::Ohlcv1S,
454            BarAggregation::Minute => dbn::Schema::Ohlcv1M,
455            BarAggregation::Hour => dbn::Schema::Ohlcv1H,
456            BarAggregation::Day => dbn::Schema::Ohlcv1D,
457            _ => panic!("Invalid `BarAggregation` for request, was {aggregation}"),
458        };
459        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
460        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
461        let params = GetRangeParams::builder()
462            .dataset(dataset)
463            .date_time_range(time_range)
464            .symbols(symbols)
465            .stype_in(stype_in)
466            .schema(schema)
467            .limit(limit.and_then(NonZeroU64::new))
468            .build();
469
470        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
471        let publisher_venue_map = self.publisher_venue_map.clone();
472        let symbol_venue_map = self.symbol_venue_map.clone();
473        let ts_init = self.clock.get_time_ns();
474
475        pyo3_async_runtimes::tokio::future_into_py(py, async move {
476            let mut client = client.lock().await; // TODO: Use a client pool
477            let mut decoder = client
478                .timeseries()
479                .get_range(&params)
480                .await
481                .map_err(to_pyvalue_err)?;
482
483            let metadata = decoder.metadata().clone();
484            let mut metadata_cache = MetadataCache::new(metadata);
485            let mut result: Vec<Bar> = Vec::new();
486
487            while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
488                let record = dbn::RecordRef::from(msg);
489                let instrument_id = decode_nautilus_instrument_id(
490                    &record,
491                    &mut metadata_cache,
492                    &publisher_venue_map,
493                    &symbol_venue_map.read().unwrap(),
494                )
495                .map_err(to_pyvalue_err)?;
496
497                let (data, _) = decode_record(
498                    &record,
499                    instrument_id,
500                    price_precision,
501                    Some(ts_init),
502                    false, // Not applicable
503                )
504                .map_err(to_pyvalue_err)?;
505
506                match data {
507                    Some(Data::Bar(bar)) => {
508                        result.push(bar);
509                    }
510                    _ => panic!("Invalid data element not `Bar`, was {data:?}"),
511                }
512            }
513
514            Python::with_gil(|py| result.into_py_any(py))
515        })
516    }
517
518    #[pyo3(name = "get_range_imbalance")]
519    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
520    #[allow(clippy::too_many_arguments)]
521    fn py_get_range_imbalance<'py>(
522        &self,
523        py: Python<'py>,
524        dataset: String,
525        instrument_ids: Vec<InstrumentId>,
526        start: u64,
527        end: Option<u64>,
528        limit: Option<u64>,
529        price_precision: Option<u8>,
530    ) -> PyResult<Bound<'py, PyAny>> {
531        let client = self.inner.clone();
532        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
533        let symbols: Vec<String> = instrument_ids
534            .iter()
535            .map(|instrument_id| {
536                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
537            })
538            .collect();
539
540        let stype_in = infer_symbology_type(symbols.first().unwrap());
541        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
542        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
543        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
544        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
545        let params = GetRangeParams::builder()
546            .dataset(dataset)
547            .date_time_range(time_range)
548            .symbols(symbols)
549            .stype_in(stype_in)
550            .schema(dbn::Schema::Imbalance)
551            .limit(limit.and_then(NonZeroU64::new))
552            .build();
553
554        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
555        let publisher_venue_map = self.publisher_venue_map.clone();
556        let symbol_venue_map = self.symbol_venue_map.clone();
557        let ts_init = self.clock.get_time_ns();
558
559        pyo3_async_runtimes::tokio::future_into_py(py, async move {
560            let mut client = client.lock().await; // TODO: Use a client pool
561            let mut decoder = client
562                .timeseries()
563                .get_range(&params)
564                .await
565                .map_err(to_pyvalue_err)?;
566
567            let metadata = decoder.metadata().clone();
568            let mut metadata_cache = MetadataCache::new(metadata);
569            let mut result: Vec<DatabentoImbalance> = Vec::new();
570
571            while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
572                let record = dbn::RecordRef::from(msg);
573                let instrument_id = decode_nautilus_instrument_id(
574                    &record,
575                    &mut metadata_cache,
576                    &publisher_venue_map,
577                    &symbol_venue_map.read().unwrap(),
578                )
579                .map_err(to_pyvalue_err)?;
580
581                let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, ts_init)
582                    .map_err(to_pyvalue_err)?;
583
584                result.push(imbalance);
585            }
586
587            Python::with_gil(|py| result.into_py_any(py))
588        })
589    }
590
591    #[pyo3(name = "get_range_statistics")]
592    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
593    #[allow(clippy::too_many_arguments)]
594    fn py_get_range_statistics<'py>(
595        &self,
596        py: Python<'py>,
597        dataset: String,
598        instrument_ids: Vec<InstrumentId>,
599        start: u64,
600        end: Option<u64>,
601        limit: Option<u64>,
602        price_precision: Option<u8>,
603    ) -> PyResult<Bound<'py, PyAny>> {
604        let client = self.inner.clone();
605        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
606        let symbols: Vec<String> = instrument_ids
607            .iter()
608            .map(|instrument_id| {
609                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
610            })
611            .collect();
612
613        let stype_in = infer_symbology_type(symbols.first().unwrap());
614        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
615        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
616        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
617        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
618        let params = GetRangeParams::builder()
619            .dataset(dataset)
620            .date_time_range(time_range)
621            .symbols(symbols)
622            .stype_in(stype_in)
623            .schema(dbn::Schema::Statistics)
624            .limit(limit.and_then(NonZeroU64::new))
625            .build();
626
627        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
628        let publisher_venue_map = self.publisher_venue_map.clone();
629        let symbol_venue_map = self.symbol_venue_map.clone();
630        let ts_init = self.clock.get_time_ns();
631
632        pyo3_async_runtimes::tokio::future_into_py(py, async move {
633            let mut client = client.lock().await; // TODO: Use a client pool
634            let mut decoder = client
635                .timeseries()
636                .get_range(&params)
637                .await
638                .map_err(to_pyvalue_err)?;
639
640            let metadata = decoder.metadata().clone();
641            let mut metadata_cache = MetadataCache::new(metadata);
642            let mut result: Vec<DatabentoStatistics> = Vec::new();
643
644            while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
645                let record = dbn::RecordRef::from(msg);
646                let instrument_id = decode_nautilus_instrument_id(
647                    &record,
648                    &mut metadata_cache,
649                    &publisher_venue_map,
650                    &symbol_venue_map.read().unwrap(),
651                )
652                .map_err(to_pyvalue_err)?;
653
654                let statistics =
655                    decode_statistics_msg(msg, instrument_id, price_precision, ts_init)
656                        .map_err(to_pyvalue_err)?;
657
658                result.push(statistics);
659            }
660
661            Python::with_gil(|py| result.into_py_any(py))
662        })
663    }
664
665    #[pyo3(name = "get_range_status")]
666    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
667    #[allow(clippy::too_many_arguments)]
668    fn py_get_range_status<'py>(
669        &self,
670        py: Python<'py>,
671        dataset: String,
672        instrument_ids: Vec<InstrumentId>,
673        start: u64,
674        end: Option<u64>,
675        limit: Option<u64>,
676    ) -> PyResult<Bound<'py, PyAny>> {
677        let client = self.inner.clone();
678        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
679        let symbols: Vec<String> = instrument_ids
680            .iter()
681            .map(|instrument_id| {
682                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
683            })
684            .collect();
685
686        let stype_in = infer_symbology_type(symbols.first().unwrap());
687        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
688        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
689        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
690        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
691        let params = GetRangeParams::builder()
692            .dataset(dataset)
693            .date_time_range(time_range)
694            .symbols(symbols)
695            .stype_in(stype_in)
696            .schema(dbn::Schema::Status)
697            .limit(limit.and_then(NonZeroU64::new))
698            .build();
699
700        let publisher_venue_map = self.publisher_venue_map.clone();
701        let ts_init = self.clock.get_time_ns();
702        let symbol_venue_map = self.symbol_venue_map.clone();
703
704        pyo3_async_runtimes::tokio::future_into_py(py, async move {
705            let mut client = client.lock().await; // TODO: Use a client pool
706            let mut decoder = client
707                .timeseries()
708                .get_range(&params)
709                .await
710                .map_err(to_pyvalue_err)?;
711
712            let metadata = decoder.metadata().clone();
713            let mut metadata_cache = MetadataCache::new(metadata);
714            let mut result: Vec<InstrumentStatus> = Vec::new();
715
716            while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
717                let record = dbn::RecordRef::from(msg);
718                let instrument_id = decode_nautilus_instrument_id(
719                    &record,
720                    &mut metadata_cache,
721                    &publisher_venue_map,
722                    &symbol_venue_map.read().unwrap(),
723                )
724                .map_err(to_pyvalue_err)?;
725
726                let status =
727                    decode_status_msg(msg, instrument_id, ts_init).map_err(to_pyvalue_err)?;
728
729                result.push(status);
730            }
731
732            Python::with_gil(|py| result.into_py_any(py))
733        })
734    }
735}