nautilus_databento/python/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fs,
19    num::NonZeroU64,
20    path::PathBuf,
21    str::FromStr,
22    sync::{Arc, RwLock},
23};
24
25use databento::{
26    dbn::{self},
27    historical::timeseries::GetRangeParams,
28};
29use indexmap::IndexMap;
30use nautilus_core::{
31    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
32    time::{AtomicTime, get_atomic_clock_realtime},
33};
34use nautilus_model::{
35    data::{Bar, Data, InstrumentStatus, QuoteTick, TradeTick},
36    enums::BarAggregation,
37    identifiers::{InstrumentId, Symbol, Venue},
38    python::instruments::instrument_any_to_pyobject,
39    types::Currency,
40};
41use pyo3::{
42    IntoPyObjectExt,
43    exceptions::PyException,
44    prelude::*,
45    types::{PyDict, PyList},
46};
47use tokio::sync::Mutex;
48
49use crate::{
50    common::get_date_time_range,
51    decode::{
52        decode_imbalance_msg, decode_instrument_def_msg, decode_record, decode_statistics_msg,
53        decode_status_msg,
54    },
55    symbology::{
56        check_consistent_symbology, decode_nautilus_instrument_id, infer_symbology_type,
57        instrument_id_to_symbol_string,
58    },
59    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
60};
61
62#[cfg_attr(
63    feature = "python",
64    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
65)]
66#[derive(Debug)]
67pub struct DatabentoHistoricalClient {
68    #[pyo3(get)]
69    pub key: String,
70    clock: &'static AtomicTime,
71    inner: Arc<Mutex<databento::HistoricalClient>>,
72    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
73    symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
74}
75
76#[pymethods]
77impl DatabentoHistoricalClient {
78    #[new]
79    fn py_new(key: String, publishers_filepath: PathBuf) -> PyResult<Self> {
80        let client = databento::HistoricalClient::builder()
81            .key(key.clone())
82            .map_err(to_pyvalue_err)?
83            .build()
84            .map_err(to_pyvalue_err)?;
85
86        let file_content = fs::read_to_string(publishers_filepath)?;
87        let publishers_vec: Vec<DatabentoPublisher> =
88            serde_json::from_str(&file_content).map_err(to_pyvalue_err)?;
89
90        let publisher_venue_map = publishers_vec
91            .into_iter()
92            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
93            .collect::<IndexMap<u16, Venue>>();
94
95        Ok(Self {
96            clock: get_atomic_clock_realtime(),
97            inner: Arc::new(Mutex::new(client)),
98            publisher_venue_map: Arc::new(publisher_venue_map),
99            symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
100            key,
101        })
102    }
103
104    #[pyo3(name = "get_dataset_range")]
105    fn py_get_dataset_range<'py>(
106        &self,
107        py: Python<'py>,
108        dataset: String,
109    ) -> PyResult<Bound<'py, PyAny>> {
110        let client = self.inner.clone();
111
112        pyo3_async_runtimes::tokio::future_into_py(py, async move {
113            let mut client = client.lock().await; // TODO: Use a client pool
114            let response = client.metadata().get_dataset_range(&dataset).await;
115            match response {
116                Ok(res) => Python::with_gil(|py| {
117                    let dict = PyDict::new(py);
118                    dict.set_item("start", res.start.to_string())?;
119                    dict.set_item("end", res.end.to_string())?;
120                    dict.into_py_any(py)
121                }),
122                Err(e) => Err(PyErr::new::<PyException, _>(format!(
123                    "Error handling response: {e}"
124                ))),
125            }
126        })
127    }
128
129    #[allow(clippy::too_many_arguments)]
130    #[pyo3(name = "get_range_instruments")]
131    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, use_exchange_as_venue=false))]
132    fn py_get_range_instruments<'py>(
133        &self,
134        py: Python<'py>,
135        dataset: String,
136        instrument_ids: Vec<InstrumentId>,
137        start: u64,
138        end: Option<u64>,
139        limit: Option<u64>,
140        use_exchange_as_venue: bool,
141    ) -> PyResult<Bound<'py, PyAny>> {
142        let client = self.inner.clone();
143        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
144        let symbols: Vec<String> = instrument_ids
145            .iter()
146            .map(|instrument_id| {
147                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
148            })
149            .collect();
150
151        let stype_in = infer_symbology_type(symbols.first().unwrap());
152        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
153        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
154        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
155        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
156        let params = GetRangeParams::builder()
157            .dataset(dataset)
158            .date_time_range(time_range)
159            .symbols(symbols)
160            .stype_in(stype_in)
161            .schema(dbn::Schema::Definition)
162            .limit(limit.and_then(NonZeroU64::new))
163            .build();
164
165        let publisher_venue_map = self.publisher_venue_map.clone();
166        let symbol_venue_map = self.symbol_venue_map.clone();
167        let ts_init = self.clock.get_time_ns();
168
169        pyo3_async_runtimes::tokio::future_into_py(py, async move {
170            let mut client = client.lock().await; // TODO: Use a client pool
171            let mut decoder = client
172                .timeseries()
173                .get_range(&params)
174                .await
175                .map_err(to_pyvalue_err)?;
176
177            decoder.set_upgrade_policy(dbn::VersionUpgradePolicy::UpgradeToV2);
178
179            let metadata = decoder.metadata().clone();
180            let mut instruments = Vec::new();
181
182            while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
183                let record = dbn::RecordRef::from(msg);
184                let mut instrument_id = decode_nautilus_instrument_id(
185                    &record,
186                    &metadata,
187                    &publisher_venue_map,
188                    &symbol_venue_map.read().unwrap(),
189                )
190                .map_err(to_pyvalue_err)?;
191
192                if use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
193                    let exchange = msg.exchange().unwrap();
194                    let venue = Venue::from_code(exchange)
195                        .unwrap_or_else(|_| panic!("`Venue` not found for exchange {exchange}"));
196                    instrument_id.venue = venue;
197                }
198
199                let result = decode_instrument_def_msg(msg, instrument_id, ts_init);
200                match result {
201                    Ok(instrument) => instruments.push(instrument),
202                    Err(e) => tracing::error!("{e:?}"),
203                }
204            }
205
206            Python::with_gil(|py| {
207                let py_results: PyResult<Vec<PyObject>> = instruments
208                    .into_iter()
209                    .map(|result| instrument_any_to_pyobject(py, result))
210                    .collect();
211
212                py_results.map(|objs| {
213                    PyList::new(py, &objs)
214                        .expect("Invalid `ExactSizeIterator`")
215                        .into_py_any_unwrap(py)
216                })
217            })
218        })
219    }
220
221    #[pyo3(name = "get_range_quotes")]
222    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
223    #[allow(clippy::too_many_arguments)]
224    fn py_get_range_quotes<'py>(
225        &self,
226        py: Python<'py>,
227        dataset: String,
228        instrument_ids: Vec<InstrumentId>,
229        start: u64,
230        end: Option<u64>,
231        limit: Option<u64>,
232        price_precision: Option<u8>,
233        schema: Option<String>,
234    ) -> PyResult<Bound<'py, PyAny>> {
235        let client = self.inner.clone();
236        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
237        let symbols: Vec<String> = instrument_ids
238            .iter()
239            .map(|instrument_id| {
240                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
241            })
242            .collect();
243
244        let stype_in = infer_symbology_type(symbols.first().unwrap());
245        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
246        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
247        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
248        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
249        let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
250        let dbn_schema = dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?;
251        match dbn_schema {
252            dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
253            _ => {
254                return Err(to_pyvalue_err(
255                    "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m",
256                ));
257            }
258        }
259        let params = GetRangeParams::builder()
260            .dataset(dataset)
261            .date_time_range(time_range)
262            .symbols(symbols)
263            .stype_in(stype_in)
264            .schema(dbn_schema)
265            .limit(limit.and_then(NonZeroU64::new))
266            .build();
267
268        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
269        let publisher_venue_map = self.publisher_venue_map.clone();
270        let symbol_venue_map = self.symbol_venue_map.clone();
271        let ts_init = self.clock.get_time_ns();
272
273        pyo3_async_runtimes::tokio::future_into_py(py, async move {
274            let mut client = client.lock().await; // TODO: Use a client pool
275            let mut decoder = client
276                .timeseries()
277                .get_range(&params)
278                .await
279                .map_err(to_pyvalue_err)?;
280
281            let metadata = decoder.metadata().clone();
282            let mut result: Vec<QuoteTick> = Vec::new();
283
284            let mut process_record = |record: dbn::RecordRef| -> PyResult<()> {
285                let instrument_id = decode_nautilus_instrument_id(
286                    &record,
287                    &metadata,
288                    &publisher_venue_map,
289                    &symbol_venue_map.read().unwrap(),
290                )
291                .map_err(to_pyvalue_err)?;
292
293                let (data, _) = decode_record(
294                    &record,
295                    instrument_id,
296                    price_precision,
297                    Some(ts_init),
298                    false, // Don't include trades
299                )
300                .map_err(to_pyvalue_err)?;
301
302                match data {
303                    Some(Data::Quote(quote)) => {
304                        result.push(quote);
305                        Ok(())
306                    }
307                    _ => panic!("Invalid data element not `QuoteTick`, was {data:?}"),
308                }
309            };
310
311            match dbn_schema {
312                dbn::Schema::Mbp1 => {
313                    while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
314                        process_record(dbn::RecordRef::from(msg))?;
315                    }
316                }
317                dbn::Schema::Bbo1M => {
318                    while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
319                        process_record(dbn::RecordRef::from(msg))?;
320                    }
321                }
322                dbn::Schema::Bbo1S => {
323                    while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
324                        process_record(dbn::RecordRef::from(msg))?;
325                    }
326                }
327                _ => panic!("Invalid schema {dbn_schema}"),
328            }
329
330            Python::with_gil(|py| result.into_py_any(py))
331        })
332    }
333
334    #[pyo3(name = "get_range_trades")]
335    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
336    #[allow(clippy::too_many_arguments)]
337    fn py_get_range_trades<'py>(
338        &self,
339        py: Python<'py>,
340        dataset: String,
341        instrument_ids: Vec<InstrumentId>,
342        start: u64,
343        end: Option<u64>,
344        limit: Option<u64>,
345        price_precision: Option<u8>,
346    ) -> PyResult<Bound<'py, PyAny>> {
347        let client = self.inner.clone();
348        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
349        let symbols: Vec<String> = instrument_ids
350            .iter()
351            .map(|instrument_id| {
352                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
353            })
354            .collect();
355
356        let stype_in = infer_symbology_type(symbols.first().unwrap());
357        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
358        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
359        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
360        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
361        let params = GetRangeParams::builder()
362            .dataset(dataset)
363            .date_time_range(time_range)
364            .symbols(symbols)
365            .stype_in(stype_in)
366            .schema(dbn::Schema::Trades)
367            .limit(limit.and_then(NonZeroU64::new))
368            .build();
369
370        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
371        let publisher_venue_map = self.publisher_venue_map.clone();
372        let symbol_venue_map = self.symbol_venue_map.clone();
373        let ts_init = self.clock.get_time_ns();
374
375        pyo3_async_runtimes::tokio::future_into_py(py, async move {
376            let mut client = client.lock().await; // TODO: Use a client pool
377            let mut decoder = client
378                .timeseries()
379                .get_range(&params)
380                .await
381                .map_err(to_pyvalue_err)?;
382
383            let metadata = decoder.metadata().clone();
384            let mut result: Vec<TradeTick> = Vec::new();
385
386            while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
387                let record = dbn::RecordRef::from(msg);
388                let instrument_id = decode_nautilus_instrument_id(
389                    &record,
390                    &metadata,
391                    &publisher_venue_map,
392                    &symbol_venue_map.read().unwrap(),
393                )
394                .map_err(to_pyvalue_err)?;
395
396                let (data, _) = decode_record(
397                    &record,
398                    instrument_id,
399                    price_precision,
400                    Some(ts_init),
401                    false, // Not applicable (trade will be decoded regardless)
402                )
403                .map_err(to_pyvalue_err)?;
404
405                match data {
406                    Some(Data::Trade(trade)) => {
407                        result.push(trade);
408                    }
409                    _ => panic!("Invalid data element not `TradeTick`, was {data:?}"),
410                }
411            }
412
413            Python::with_gil(|py| result.into_py_any(py))
414        })
415    }
416
417    #[pyo3(name = "get_range_bars")]
418    #[allow(clippy::too_many_arguments)]
419    #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None))]
420    fn py_get_range_bars<'py>(
421        &self,
422        py: Python<'py>,
423        dataset: String,
424        instrument_ids: Vec<InstrumentId>,
425        aggregation: BarAggregation,
426        start: u64,
427        end: Option<u64>,
428        limit: Option<u64>,
429        price_precision: Option<u8>,
430    ) -> PyResult<Bound<'py, PyAny>> {
431        let client = self.inner.clone();
432        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
433        let symbols: Vec<String> = instrument_ids
434            .iter()
435            .map(|instrument_id| {
436                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
437            })
438            .collect();
439
440        let stype_in = infer_symbology_type(symbols.first().unwrap());
441        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
442        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
443        let schema = match aggregation {
444            BarAggregation::Second => dbn::Schema::Ohlcv1S,
445            BarAggregation::Minute => dbn::Schema::Ohlcv1M,
446            BarAggregation::Hour => dbn::Schema::Ohlcv1H,
447            BarAggregation::Day => dbn::Schema::Ohlcv1D,
448            _ => panic!("Invalid `BarAggregation` for request, was {aggregation}"),
449        };
450        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
451        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
452        let params = GetRangeParams::builder()
453            .dataset(dataset)
454            .date_time_range(time_range)
455            .symbols(symbols)
456            .stype_in(stype_in)
457            .schema(schema)
458            .limit(limit.and_then(NonZeroU64::new))
459            .build();
460
461        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
462        let publisher_venue_map = self.publisher_venue_map.clone();
463        let symbol_venue_map = self.symbol_venue_map.clone();
464        let ts_init = self.clock.get_time_ns();
465
466        pyo3_async_runtimes::tokio::future_into_py(py, async move {
467            let mut client = client.lock().await; // TODO: Use a client pool
468            let mut decoder = client
469                .timeseries()
470                .get_range(&params)
471                .await
472                .map_err(to_pyvalue_err)?;
473
474            let metadata = decoder.metadata().clone();
475            let mut result: Vec<Bar> = Vec::new();
476
477            while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
478                let record = dbn::RecordRef::from(msg);
479                let instrument_id = decode_nautilus_instrument_id(
480                    &record,
481                    &metadata,
482                    &publisher_venue_map,
483                    &symbol_venue_map.read().unwrap(),
484                )
485                .map_err(to_pyvalue_err)?;
486
487                let (data, _) = decode_record(
488                    &record,
489                    instrument_id,
490                    price_precision,
491                    Some(ts_init),
492                    false, // Not applicable
493                )
494                .map_err(to_pyvalue_err)?;
495
496                match data {
497                    Some(Data::Bar(bar)) => {
498                        result.push(bar);
499                    }
500                    _ => panic!("Invalid data element not `Bar`, was {data:?}"),
501                }
502            }
503
504            Python::with_gil(|py| result.into_py_any(py))
505        })
506    }
507
508    #[pyo3(name = "get_range_imbalance")]
509    #[allow(clippy::too_many_arguments)]
510    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
511    fn py_get_range_imbalance<'py>(
512        &self,
513        py: Python<'py>,
514        dataset: String,
515        instrument_ids: Vec<InstrumentId>,
516        start: u64,
517        end: Option<u64>,
518        limit: Option<u64>,
519        price_precision: Option<u8>,
520    ) -> PyResult<Bound<'py, PyAny>> {
521        let client = self.inner.clone();
522        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
523        let symbols: Vec<String> = instrument_ids
524            .iter()
525            .map(|instrument_id| {
526                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
527            })
528            .collect();
529
530        let stype_in = infer_symbology_type(symbols.first().unwrap());
531        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
532        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
533        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
534        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
535        let params = GetRangeParams::builder()
536            .dataset(dataset)
537            .date_time_range(time_range)
538            .symbols(symbols)
539            .stype_in(stype_in)
540            .schema(dbn::Schema::Imbalance)
541            .limit(limit.and_then(NonZeroU64::new))
542            .build();
543
544        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
545        let publisher_venue_map = self.publisher_venue_map.clone();
546        let symbol_venue_map = self.symbol_venue_map.clone();
547        let ts_init = self.clock.get_time_ns();
548
549        pyo3_async_runtimes::tokio::future_into_py(py, async move {
550            let mut client = client.lock().await; // TODO: Use a client pool
551            let mut decoder = client
552                .timeseries()
553                .get_range(&params)
554                .await
555                .map_err(to_pyvalue_err)?;
556
557            let metadata = decoder.metadata().clone();
558            let mut result: Vec<DatabentoImbalance> = Vec::new();
559
560            while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
561                let record = dbn::RecordRef::from(msg);
562                let instrument_id = decode_nautilus_instrument_id(
563                    &record,
564                    &metadata,
565                    &publisher_venue_map,
566                    &symbol_venue_map.read().unwrap(),
567                )
568                .map_err(to_pyvalue_err)?;
569
570                let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, ts_init)
571                    .map_err(to_pyvalue_err)?;
572
573                result.push(imbalance);
574            }
575
576            Python::with_gil(|py| result.into_py_any(py))
577        })
578    }
579
580    #[pyo3(name = "get_range_statistics")]
581    #[allow(clippy::too_many_arguments)]
582    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
583    fn py_get_range_statistics<'py>(
584        &self,
585        py: Python<'py>,
586        dataset: String,
587        instrument_ids: Vec<InstrumentId>,
588        start: u64,
589        end: Option<u64>,
590        limit: Option<u64>,
591        price_precision: Option<u8>,
592    ) -> PyResult<Bound<'py, PyAny>> {
593        let client = self.inner.clone();
594        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
595        let symbols: Vec<String> = instrument_ids
596            .iter()
597            .map(|instrument_id| {
598                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
599            })
600            .collect();
601
602        let stype_in = infer_symbology_type(symbols.first().unwrap());
603        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
604        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
605        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
606        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
607        let params = GetRangeParams::builder()
608            .dataset(dataset)
609            .date_time_range(time_range)
610            .symbols(symbols)
611            .stype_in(stype_in)
612            .schema(dbn::Schema::Statistics)
613            .limit(limit.and_then(NonZeroU64::new))
614            .build();
615
616        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
617        let publisher_venue_map = self.publisher_venue_map.clone();
618        let symbol_venue_map = self.symbol_venue_map.clone();
619        let ts_init = self.clock.get_time_ns();
620
621        pyo3_async_runtimes::tokio::future_into_py(py, async move {
622            let mut client = client.lock().await; // TODO: Use a client pool
623            let mut decoder = client
624                .timeseries()
625                .get_range(&params)
626                .await
627                .map_err(to_pyvalue_err)?;
628
629            let metadata = decoder.metadata().clone();
630            let mut result: Vec<DatabentoStatistics> = Vec::new();
631
632            while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
633                let record = dbn::RecordRef::from(msg);
634                let instrument_id = decode_nautilus_instrument_id(
635                    &record,
636                    &metadata,
637                    &publisher_venue_map,
638                    &symbol_venue_map.read().unwrap(),
639                )
640                .map_err(to_pyvalue_err)?;
641
642                let statistics =
643                    decode_statistics_msg(msg, instrument_id, price_precision, ts_init)
644                        .map_err(to_pyvalue_err)?;
645
646                result.push(statistics);
647            }
648
649            Python::with_gil(|py| result.into_py_any(py))
650        })
651    }
652
653    #[pyo3(name = "get_range_status")]
654    #[allow(clippy::too_many_arguments)]
655    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
656    fn py_get_range_status<'py>(
657        &self,
658        py: Python<'py>,
659        dataset: String,
660        instrument_ids: Vec<InstrumentId>,
661        start: u64,
662        end: Option<u64>,
663        limit: Option<u64>,
664    ) -> PyResult<Bound<'py, PyAny>> {
665        let client = self.inner.clone();
666        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
667        let symbols: Vec<String> = instrument_ids
668            .iter()
669            .map(|instrument_id| {
670                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
671            })
672            .collect();
673
674        let stype_in = infer_symbology_type(symbols.first().unwrap());
675        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
676        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
677        let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
678        let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
679        let params = GetRangeParams::builder()
680            .dataset(dataset)
681            .date_time_range(time_range)
682            .symbols(symbols)
683            .stype_in(stype_in)
684            .schema(dbn::Schema::Status)
685            .limit(limit.and_then(NonZeroU64::new))
686            .build();
687
688        let publisher_venue_map = self.publisher_venue_map.clone();
689        let ts_init = self.clock.get_time_ns();
690        let symbol_venue_map = self.symbol_venue_map.clone();
691
692        pyo3_async_runtimes::tokio::future_into_py(py, async move {
693            let mut client = client.lock().await; // TODO: Use a client pool
694            let mut decoder = client
695                .timeseries()
696                .get_range(&params)
697                .await
698                .map_err(to_pyvalue_err)?;
699
700            let metadata = decoder.metadata().clone();
701            let mut result: Vec<InstrumentStatus> = Vec::new();
702
703            while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
704                let record = dbn::RecordRef::from(msg);
705                let instrument_id = decode_nautilus_instrument_id(
706                    &record,
707                    &metadata,
708                    &publisher_venue_map,
709                    &symbol_venue_map.read().unwrap(),
710                )
711                .map_err(to_pyvalue_err)?;
712
713                let status =
714                    decode_status_msg(msg, instrument_id, ts_init).map_err(to_pyvalue_err)?;
715
716                result.push(status);
717            }
718
719            Python::with_gil(|py| result.into_py_any(py))
720        })
721    }
722}