1use std::{
17 collections::HashMap,
18 fs,
19 num::NonZeroU64,
20 path::PathBuf,
21 str::FromStr,
22 sync::{Arc, RwLock},
23};
24
25use databento::{
26 dbn::{self},
27 historical::timeseries::GetRangeParams,
28};
29use indexmap::IndexMap;
30use nautilus_core::{
31 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
32 time::{AtomicTime, get_atomic_clock_realtime},
33};
34use nautilus_model::{
35 data::{Bar, Data, InstrumentStatus, QuoteTick, TradeTick},
36 enums::BarAggregation,
37 identifiers::{InstrumentId, Symbol, Venue},
38 python::instruments::instrument_any_to_pyobject,
39 types::Currency,
40};
41use pyo3::{
42 IntoPyObjectExt,
43 exceptions::PyException,
44 prelude::*,
45 types::{PyDict, PyList},
46};
47use tokio::sync::Mutex;
48
49use crate::{
50 common::get_date_time_range,
51 decode::{
52 decode_imbalance_msg, decode_instrument_def_msg, decode_record, decode_statistics_msg,
53 decode_status_msg,
54 },
55 symbology::{
56 MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
57 infer_symbology_type, instrument_id_to_symbol_string,
58 },
59 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
60};
61
62#[cfg_attr(
63 feature = "python",
64 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
65)]
66#[derive(Debug)]
67pub struct DatabentoHistoricalClient {
68 #[pyo3(get)]
69 pub key: String,
70 clock: &'static AtomicTime,
71 inner: Arc<Mutex<databento::HistoricalClient>>,
72 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
73 symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
74 use_exchange_as_venue: bool,
75}
76
77#[pymethods]
78impl DatabentoHistoricalClient {
79 #[new]
80 fn py_new(
81 key: String,
82 publishers_filepath: PathBuf,
83 use_exchange_as_venue: bool,
84 ) -> PyResult<Self> {
85 let client = databento::HistoricalClient::builder()
86 .key(key.clone())
87 .map_err(to_pyvalue_err)?
88 .build()
89 .map_err(to_pyvalue_err)?;
90
91 let file_content = fs::read_to_string(publishers_filepath)?;
92 let publishers_vec: Vec<DatabentoPublisher> =
93 serde_json::from_str(&file_content).map_err(to_pyvalue_err)?;
94
95 let publisher_venue_map = publishers_vec
96 .into_iter()
97 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
98 .collect::<IndexMap<u16, Venue>>();
99
100 Ok(Self {
101 clock: get_atomic_clock_realtime(),
102 inner: Arc::new(Mutex::new(client)),
103 publisher_venue_map: Arc::new(publisher_venue_map),
104 symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
105 key,
106 use_exchange_as_venue,
107 })
108 }
109
110 #[pyo3(name = "get_dataset_range")]
111 fn py_get_dataset_range<'py>(
112 &self,
113 py: Python<'py>,
114 dataset: String,
115 ) -> PyResult<Bound<'py, PyAny>> {
116 let client = self.inner.clone();
117
118 pyo3_async_runtimes::tokio::future_into_py(py, async move {
119 let mut client = client.lock().await; let response = client.metadata().get_dataset_range(&dataset).await;
121 match response {
122 Ok(res) => Python::with_gil(|py| {
123 let dict = PyDict::new(py);
124 dict.set_item("start", res.start.to_string())?;
125 dict.set_item("end", res.end.to_string())?;
126 dict.into_py_any(py)
127 }),
128 Err(e) => Err(PyErr::new::<PyException, _>(format!(
129 "Error handling response: {e}"
130 ))),
131 }
132 })
133 }
134
135 #[pyo3(name = "get_range_instruments")]
136 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
137 #[allow(clippy::too_many_arguments)]
138 fn py_get_range_instruments<'py>(
139 &self,
140 py: Python<'py>,
141 dataset: String,
142 instrument_ids: Vec<InstrumentId>,
143 start: u64,
144 end: Option<u64>,
145 limit: Option<u64>,
146 ) -> PyResult<Bound<'py, PyAny>> {
147 let client = self.inner.clone();
148 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
149 let symbols: Vec<String> = instrument_ids
150 .iter()
151 .map(|instrument_id| {
152 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
153 })
154 .collect();
155
156 let stype_in = infer_symbology_type(symbols.first().unwrap());
157 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
158 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
159 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
160 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
161 let params = GetRangeParams::builder()
162 .dataset(dataset)
163 .date_time_range(time_range)
164 .symbols(symbols)
165 .stype_in(stype_in)
166 .schema(dbn::Schema::Definition)
167 .limit(limit.and_then(NonZeroU64::new))
168 .build();
169
170 let publisher_venue_map = self.publisher_venue_map.clone();
171 let symbol_venue_map = self.symbol_venue_map.clone();
172 let ts_init = self.clock.get_time_ns();
173 let use_exchange_as_venue = self.use_exchange_as_venue;
174
175 pyo3_async_runtimes::tokio::future_into_py(py, async move {
176 let mut client = client.lock().await; let mut decoder = client
178 .timeseries()
179 .get_range(¶ms)
180 .await
181 .map_err(to_pyvalue_err)?;
182
183 decoder.set_upgrade_policy(dbn::VersionUpgradePolicy::UpgradeToV2);
184
185 let metadata = decoder.metadata().clone();
186 let mut metadata_cache = MetadataCache::new(metadata);
187 let mut instruments = Vec::new();
188
189 while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
190 let record = dbn::RecordRef::from(msg);
191 let mut instrument_id = decode_nautilus_instrument_id(
192 &record,
193 &mut metadata_cache,
194 &publisher_venue_map,
195 &symbol_venue_map.read().unwrap(),
196 )
197 .map_err(to_pyvalue_err)?;
198
199 if use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
200 let exchange = msg.exchange().unwrap();
201 let venue = Venue::from_code(exchange)
202 .unwrap_or_else(|_| panic!("`Venue` not found for exchange {exchange}"));
203 instrument_id.venue = venue;
204 }
205
206 let result = decode_instrument_def_msg(msg, instrument_id, ts_init);
207 match result {
208 Ok(instrument) => instruments.push(instrument),
209 Err(e) => tracing::error!("{e:?}"),
210 }
211 }
212
213 Python::with_gil(|py| {
214 let py_results: PyResult<Vec<PyObject>> = instruments
215 .into_iter()
216 .map(|result| instrument_any_to_pyobject(py, result))
217 .collect();
218
219 py_results.map(|objs| {
220 PyList::new(py, &objs)
221 .expect("Invalid `ExactSizeIterator`")
222 .into_py_any_unwrap(py)
223 })
224 })
225 })
226 }
227
228 #[pyo3(name = "get_range_quotes")]
229 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
230 #[allow(clippy::too_many_arguments)]
231 fn py_get_range_quotes<'py>(
232 &self,
233 py: Python<'py>,
234 dataset: String,
235 instrument_ids: Vec<InstrumentId>,
236 start: u64,
237 end: Option<u64>,
238 limit: Option<u64>,
239 price_precision: Option<u8>,
240 schema: Option<String>,
241 ) -> PyResult<Bound<'py, PyAny>> {
242 let client = self.inner.clone();
243 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
244 let symbols: Vec<String> = instrument_ids
245 .iter()
246 .map(|instrument_id| {
247 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
248 })
249 .collect();
250
251 let stype_in = infer_symbology_type(symbols.first().unwrap());
252 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
253 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
254 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
255 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
256 let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
257 let dbn_schema = dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?;
258 match dbn_schema {
259 dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
260 _ => {
261 return Err(to_pyvalue_err(
262 "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m",
263 ));
264 }
265 }
266 let params = GetRangeParams::builder()
267 .dataset(dataset)
268 .date_time_range(time_range)
269 .symbols(symbols)
270 .stype_in(stype_in)
271 .schema(dbn_schema)
272 .limit(limit.and_then(NonZeroU64::new))
273 .build();
274
275 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
276 let publisher_venue_map = self.publisher_venue_map.clone();
277 let symbol_venue_map = self.symbol_venue_map.clone();
278 let ts_init = self.clock.get_time_ns();
279
280 pyo3_async_runtimes::tokio::future_into_py(py, async move {
281 let mut client = client.lock().await; let mut decoder = client
283 .timeseries()
284 .get_range(¶ms)
285 .await
286 .map_err(to_pyvalue_err)?;
287
288 let metadata = decoder.metadata().clone();
289 let mut metadata_cache = MetadataCache::new(metadata);
290 let mut result: Vec<QuoteTick> = Vec::new();
291
292 let mut process_record = |record: dbn::RecordRef| -> PyResult<()> {
293 let instrument_id = decode_nautilus_instrument_id(
294 &record,
295 &mut metadata_cache,
296 &publisher_venue_map,
297 &symbol_venue_map.read().unwrap(),
298 )
299 .map_err(to_pyvalue_err)?;
300
301 let (data, _) = decode_record(
302 &record,
303 instrument_id,
304 price_precision,
305 Some(ts_init),
306 false, )
308 .map_err(to_pyvalue_err)?;
309
310 match data {
311 Some(Data::Quote(quote)) => {
312 result.push(quote);
313 Ok(())
314 }
315 _ => panic!("Invalid data element not `QuoteTick`, was {data:?}"),
316 }
317 };
318
319 match dbn_schema {
320 dbn::Schema::Mbp1 => {
321 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
322 process_record(dbn::RecordRef::from(msg))?;
323 }
324 }
325 dbn::Schema::Bbo1M => {
326 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
327 process_record(dbn::RecordRef::from(msg))?;
328 }
329 }
330 dbn::Schema::Bbo1S => {
331 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
332 process_record(dbn::RecordRef::from(msg))?;
333 }
334 }
335 _ => panic!("Invalid schema {dbn_schema}"),
336 }
337
338 Python::with_gil(|py| result.into_py_any(py))
339 })
340 }
341
342 #[pyo3(name = "get_range_trades")]
343 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
344 #[allow(clippy::too_many_arguments)]
345 fn py_get_range_trades<'py>(
346 &self,
347 py: Python<'py>,
348 dataset: String,
349 instrument_ids: Vec<InstrumentId>,
350 start: u64,
351 end: Option<u64>,
352 limit: Option<u64>,
353 price_precision: Option<u8>,
354 ) -> PyResult<Bound<'py, PyAny>> {
355 let client = self.inner.clone();
356 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
357 let symbols: Vec<String> = instrument_ids
358 .iter()
359 .map(|instrument_id| {
360 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
361 })
362 .collect();
363
364 let stype_in = infer_symbology_type(symbols.first().unwrap());
365 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
366 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
367 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
368 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
369 let params = GetRangeParams::builder()
370 .dataset(dataset)
371 .date_time_range(time_range)
372 .symbols(symbols)
373 .stype_in(stype_in)
374 .schema(dbn::Schema::Trades)
375 .limit(limit.and_then(NonZeroU64::new))
376 .build();
377
378 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
379 let publisher_venue_map = self.publisher_venue_map.clone();
380 let symbol_venue_map = self.symbol_venue_map.clone();
381 let ts_init = self.clock.get_time_ns();
382
383 pyo3_async_runtimes::tokio::future_into_py(py, async move {
384 let mut client = client.lock().await; let mut decoder = client
386 .timeseries()
387 .get_range(¶ms)
388 .await
389 .map_err(to_pyvalue_err)?;
390
391 let metadata = decoder.metadata().clone();
392 let mut metadata_cache = MetadataCache::new(metadata);
393 let mut result: Vec<TradeTick> = Vec::new();
394
395 while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
396 let record = dbn::RecordRef::from(msg);
397 let instrument_id = decode_nautilus_instrument_id(
398 &record,
399 &mut metadata_cache,
400 &publisher_venue_map,
401 &symbol_venue_map.read().unwrap(),
402 )
403 .map_err(to_pyvalue_err)?;
404
405 let (data, _) = decode_record(
406 &record,
407 instrument_id,
408 price_precision,
409 Some(ts_init),
410 false, )
412 .map_err(to_pyvalue_err)?;
413
414 match data {
415 Some(Data::Trade(trade)) => {
416 result.push(trade);
417 }
418 _ => panic!("Invalid data element not `TradeTick`, was {data:?}"),
419 }
420 }
421
422 Python::with_gil(|py| result.into_py_any(py))
423 })
424 }
425
426 #[pyo3(name = "get_range_bars")]
427 #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None))]
428 #[allow(clippy::too_many_arguments)]
429 fn py_get_range_bars<'py>(
430 &self,
431 py: Python<'py>,
432 dataset: String,
433 instrument_ids: Vec<InstrumentId>,
434 aggregation: BarAggregation,
435 start: u64,
436 end: Option<u64>,
437 limit: Option<u64>,
438 price_precision: Option<u8>,
439 ) -> PyResult<Bound<'py, PyAny>> {
440 let client = self.inner.clone();
441 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
442 let symbols: Vec<String> = instrument_ids
443 .iter()
444 .map(|instrument_id| {
445 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
446 })
447 .collect();
448
449 let stype_in = infer_symbology_type(symbols.first().unwrap());
450 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
451 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
452 let schema = match aggregation {
453 BarAggregation::Second => dbn::Schema::Ohlcv1S,
454 BarAggregation::Minute => dbn::Schema::Ohlcv1M,
455 BarAggregation::Hour => dbn::Schema::Ohlcv1H,
456 BarAggregation::Day => dbn::Schema::Ohlcv1D,
457 _ => panic!("Invalid `BarAggregation` for request, was {aggregation}"),
458 };
459 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
460 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
461 let params = GetRangeParams::builder()
462 .dataset(dataset)
463 .date_time_range(time_range)
464 .symbols(symbols)
465 .stype_in(stype_in)
466 .schema(schema)
467 .limit(limit.and_then(NonZeroU64::new))
468 .build();
469
470 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
471 let publisher_venue_map = self.publisher_venue_map.clone();
472 let symbol_venue_map = self.symbol_venue_map.clone();
473 let ts_init = self.clock.get_time_ns();
474
475 pyo3_async_runtimes::tokio::future_into_py(py, async move {
476 let mut client = client.lock().await; let mut decoder = client
478 .timeseries()
479 .get_range(¶ms)
480 .await
481 .map_err(to_pyvalue_err)?;
482
483 let metadata = decoder.metadata().clone();
484 let mut metadata_cache = MetadataCache::new(metadata);
485 let mut result: Vec<Bar> = Vec::new();
486
487 while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
488 let record = dbn::RecordRef::from(msg);
489 let instrument_id = decode_nautilus_instrument_id(
490 &record,
491 &mut metadata_cache,
492 &publisher_venue_map,
493 &symbol_venue_map.read().unwrap(),
494 )
495 .map_err(to_pyvalue_err)?;
496
497 let (data, _) = decode_record(
498 &record,
499 instrument_id,
500 price_precision,
501 Some(ts_init),
502 false, )
504 .map_err(to_pyvalue_err)?;
505
506 match data {
507 Some(Data::Bar(bar)) => {
508 result.push(bar);
509 }
510 _ => panic!("Invalid data element not `Bar`, was {data:?}"),
511 }
512 }
513
514 Python::with_gil(|py| result.into_py_any(py))
515 })
516 }
517
518 #[pyo3(name = "get_range_imbalance")]
519 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
520 #[allow(clippy::too_many_arguments)]
521 fn py_get_range_imbalance<'py>(
522 &self,
523 py: Python<'py>,
524 dataset: String,
525 instrument_ids: Vec<InstrumentId>,
526 start: u64,
527 end: Option<u64>,
528 limit: Option<u64>,
529 price_precision: Option<u8>,
530 ) -> PyResult<Bound<'py, PyAny>> {
531 let client = self.inner.clone();
532 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
533 let symbols: Vec<String> = instrument_ids
534 .iter()
535 .map(|instrument_id| {
536 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
537 })
538 .collect();
539
540 let stype_in = infer_symbology_type(symbols.first().unwrap());
541 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
542 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
543 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
544 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
545 let params = GetRangeParams::builder()
546 .dataset(dataset)
547 .date_time_range(time_range)
548 .symbols(symbols)
549 .stype_in(stype_in)
550 .schema(dbn::Schema::Imbalance)
551 .limit(limit.and_then(NonZeroU64::new))
552 .build();
553
554 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
555 let publisher_venue_map = self.publisher_venue_map.clone();
556 let symbol_venue_map = self.symbol_venue_map.clone();
557 let ts_init = self.clock.get_time_ns();
558
559 pyo3_async_runtimes::tokio::future_into_py(py, async move {
560 let mut client = client.lock().await; let mut decoder = client
562 .timeseries()
563 .get_range(¶ms)
564 .await
565 .map_err(to_pyvalue_err)?;
566
567 let metadata = decoder.metadata().clone();
568 let mut metadata_cache = MetadataCache::new(metadata);
569 let mut result: Vec<DatabentoImbalance> = Vec::new();
570
571 while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
572 let record = dbn::RecordRef::from(msg);
573 let instrument_id = decode_nautilus_instrument_id(
574 &record,
575 &mut metadata_cache,
576 &publisher_venue_map,
577 &symbol_venue_map.read().unwrap(),
578 )
579 .map_err(to_pyvalue_err)?;
580
581 let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, ts_init)
582 .map_err(to_pyvalue_err)?;
583
584 result.push(imbalance);
585 }
586
587 Python::with_gil(|py| result.into_py_any(py))
588 })
589 }
590
591 #[pyo3(name = "get_range_statistics")]
592 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
593 #[allow(clippy::too_many_arguments)]
594 fn py_get_range_statistics<'py>(
595 &self,
596 py: Python<'py>,
597 dataset: String,
598 instrument_ids: Vec<InstrumentId>,
599 start: u64,
600 end: Option<u64>,
601 limit: Option<u64>,
602 price_precision: Option<u8>,
603 ) -> PyResult<Bound<'py, PyAny>> {
604 let client = self.inner.clone();
605 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
606 let symbols: Vec<String> = instrument_ids
607 .iter()
608 .map(|instrument_id| {
609 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
610 })
611 .collect();
612
613 let stype_in = infer_symbology_type(symbols.first().unwrap());
614 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
615 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
616 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
617 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
618 let params = GetRangeParams::builder()
619 .dataset(dataset)
620 .date_time_range(time_range)
621 .symbols(symbols)
622 .stype_in(stype_in)
623 .schema(dbn::Schema::Statistics)
624 .limit(limit.and_then(NonZeroU64::new))
625 .build();
626
627 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
628 let publisher_venue_map = self.publisher_venue_map.clone();
629 let symbol_venue_map = self.symbol_venue_map.clone();
630 let ts_init = self.clock.get_time_ns();
631
632 pyo3_async_runtimes::tokio::future_into_py(py, async move {
633 let mut client = client.lock().await; let mut decoder = client
635 .timeseries()
636 .get_range(¶ms)
637 .await
638 .map_err(to_pyvalue_err)?;
639
640 let metadata = decoder.metadata().clone();
641 let mut metadata_cache = MetadataCache::new(metadata);
642 let mut result: Vec<DatabentoStatistics> = Vec::new();
643
644 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
645 let record = dbn::RecordRef::from(msg);
646 let instrument_id = decode_nautilus_instrument_id(
647 &record,
648 &mut metadata_cache,
649 &publisher_venue_map,
650 &symbol_venue_map.read().unwrap(),
651 )
652 .map_err(to_pyvalue_err)?;
653
654 let statistics =
655 decode_statistics_msg(msg, instrument_id, price_precision, ts_init)
656 .map_err(to_pyvalue_err)?;
657
658 result.push(statistics);
659 }
660
661 Python::with_gil(|py| result.into_py_any(py))
662 })
663 }
664
665 #[pyo3(name = "get_range_status")]
666 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
667 #[allow(clippy::too_many_arguments)]
668 fn py_get_range_status<'py>(
669 &self,
670 py: Python<'py>,
671 dataset: String,
672 instrument_ids: Vec<InstrumentId>,
673 start: u64,
674 end: Option<u64>,
675 limit: Option<u64>,
676 ) -> PyResult<Bound<'py, PyAny>> {
677 let client = self.inner.clone();
678 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
679 let symbols: Vec<String> = instrument_ids
680 .iter()
681 .map(|instrument_id| {
682 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
683 })
684 .collect();
685
686 let stype_in = infer_symbology_type(symbols.first().unwrap());
687 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
688 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
689 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
690 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
691 let params = GetRangeParams::builder()
692 .dataset(dataset)
693 .date_time_range(time_range)
694 .symbols(symbols)
695 .stype_in(stype_in)
696 .schema(dbn::Schema::Status)
697 .limit(limit.and_then(NonZeroU64::new))
698 .build();
699
700 let publisher_venue_map = self.publisher_venue_map.clone();
701 let ts_init = self.clock.get_time_ns();
702 let symbol_venue_map = self.symbol_venue_map.clone();
703
704 pyo3_async_runtimes::tokio::future_into_py(py, async move {
705 let mut client = client.lock().await; let mut decoder = client
707 .timeseries()
708 .get_range(¶ms)
709 .await
710 .map_err(to_pyvalue_err)?;
711
712 let metadata = decoder.metadata().clone();
713 let mut metadata_cache = MetadataCache::new(metadata);
714 let mut result: Vec<InstrumentStatus> = Vec::new();
715
716 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
717 let record = dbn::RecordRef::from(msg);
718 let instrument_id = decode_nautilus_instrument_id(
719 &record,
720 &mut metadata_cache,
721 &publisher_venue_map,
722 &symbol_venue_map.read().unwrap(),
723 )
724 .map_err(to_pyvalue_err)?;
725
726 let status =
727 decode_status_msg(msg, instrument_id, ts_init).map_err(to_pyvalue_err)?;
728
729 result.push(status);
730 }
731
732 Python::with_gil(|py| result.into_py_any(py))
733 })
734 }
735}