1use std::{
17 collections::HashMap,
18 fs,
19 num::NonZeroU64,
20 path::PathBuf,
21 str::FromStr,
22 sync::{Arc, RwLock},
23};
24
25use databento::{
26 dbn::{self},
27 historical::timeseries::GetRangeParams,
28};
29use indexmap::IndexMap;
30use nautilus_core::{
31 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
32 time::{AtomicTime, get_atomic_clock_realtime},
33};
34use nautilus_model::{
35 data::{Bar, Data, InstrumentStatus, QuoteTick, TradeTick},
36 enums::BarAggregation,
37 identifiers::{InstrumentId, Symbol, Venue},
38 python::instruments::instrument_any_to_pyobject,
39 types::Currency,
40};
41use pyo3::{
42 IntoPyObjectExt,
43 exceptions::PyException,
44 prelude::*,
45 types::{PyDict, PyList},
46};
47use tokio::sync::Mutex;
48
49use crate::{
50 common::get_date_time_range,
51 decode::{
52 decode_imbalance_msg, decode_instrument_def_msg, decode_record, decode_statistics_msg,
53 decode_status_msg,
54 },
55 symbology::{
56 check_consistent_symbology, decode_nautilus_instrument_id, infer_symbology_type,
57 instrument_id_to_symbol_string,
58 },
59 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
60};
61
62#[cfg_attr(
63 feature = "python",
64 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
65)]
66#[derive(Debug)]
67pub struct DatabentoHistoricalClient {
68 #[pyo3(get)]
69 pub key: String,
70 clock: &'static AtomicTime,
71 inner: Arc<Mutex<databento::HistoricalClient>>,
72 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
73 symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
74}
75
76#[pymethods]
77impl DatabentoHistoricalClient {
78 #[new]
79 fn py_new(key: String, publishers_filepath: PathBuf) -> PyResult<Self> {
80 let client = databento::HistoricalClient::builder()
81 .key(key.clone())
82 .map_err(to_pyvalue_err)?
83 .build()
84 .map_err(to_pyvalue_err)?;
85
86 let file_content = fs::read_to_string(publishers_filepath)?;
87 let publishers_vec: Vec<DatabentoPublisher> =
88 serde_json::from_str(&file_content).map_err(to_pyvalue_err)?;
89
90 let publisher_venue_map = publishers_vec
91 .into_iter()
92 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
93 .collect::<IndexMap<u16, Venue>>();
94
95 Ok(Self {
96 clock: get_atomic_clock_realtime(),
97 inner: Arc::new(Mutex::new(client)),
98 publisher_venue_map: Arc::new(publisher_venue_map),
99 symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
100 key,
101 })
102 }
103
104 #[pyo3(name = "get_dataset_range")]
105 fn py_get_dataset_range<'py>(
106 &self,
107 py: Python<'py>,
108 dataset: String,
109 ) -> PyResult<Bound<'py, PyAny>> {
110 let client = self.inner.clone();
111
112 pyo3_async_runtimes::tokio::future_into_py(py, async move {
113 let mut client = client.lock().await; let response = client.metadata().get_dataset_range(&dataset).await;
115 match response {
116 Ok(res) => Python::with_gil(|py| {
117 let dict = PyDict::new(py);
118 dict.set_item("start", res.start.to_string())?;
119 dict.set_item("end", res.end.to_string())?;
120 dict.into_py_any(py)
121 }),
122 Err(e) => Err(PyErr::new::<PyException, _>(format!(
123 "Error handling response: {e}"
124 ))),
125 }
126 })
127 }
128
129 #[allow(clippy::too_many_arguments)]
130 #[pyo3(name = "get_range_instruments")]
131 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, use_exchange_as_venue=false))]
132 fn py_get_range_instruments<'py>(
133 &self,
134 py: Python<'py>,
135 dataset: String,
136 instrument_ids: Vec<InstrumentId>,
137 start: u64,
138 end: Option<u64>,
139 limit: Option<u64>,
140 use_exchange_as_venue: bool,
141 ) -> PyResult<Bound<'py, PyAny>> {
142 let client = self.inner.clone();
143 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
144 let symbols: Vec<String> = instrument_ids
145 .iter()
146 .map(|instrument_id| {
147 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
148 })
149 .collect();
150
151 let stype_in = infer_symbology_type(symbols.first().unwrap());
152 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
153 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
154 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
155 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
156 let params = GetRangeParams::builder()
157 .dataset(dataset)
158 .date_time_range(time_range)
159 .symbols(symbols)
160 .stype_in(stype_in)
161 .schema(dbn::Schema::Definition)
162 .limit(limit.and_then(NonZeroU64::new))
163 .build();
164
165 let publisher_venue_map = self.publisher_venue_map.clone();
166 let symbol_venue_map = self.symbol_venue_map.clone();
167 let ts_init = self.clock.get_time_ns();
168
169 pyo3_async_runtimes::tokio::future_into_py(py, async move {
170 let mut client = client.lock().await; let mut decoder = client
172 .timeseries()
173 .get_range(¶ms)
174 .await
175 .map_err(to_pyvalue_err)?;
176
177 decoder.set_upgrade_policy(dbn::VersionUpgradePolicy::UpgradeToV2);
178
179 let metadata = decoder.metadata().clone();
180 let mut instruments = Vec::new();
181
182 while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
183 let record = dbn::RecordRef::from(msg);
184 let mut instrument_id = decode_nautilus_instrument_id(
185 &record,
186 &metadata,
187 &publisher_venue_map,
188 &symbol_venue_map.read().unwrap(),
189 )
190 .map_err(to_pyvalue_err)?;
191
192 if use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
193 let exchange = msg.exchange().unwrap();
194 let venue = Venue::from_code(exchange)
195 .unwrap_or_else(|_| panic!("`Venue` not found for exchange {exchange}"));
196 instrument_id.venue = venue;
197 }
198
199 let result = decode_instrument_def_msg(msg, instrument_id, ts_init);
200 match result {
201 Ok(instrument) => instruments.push(instrument),
202 Err(e) => tracing::error!("{e:?}"),
203 }
204 }
205
206 Python::with_gil(|py| {
207 let py_results: PyResult<Vec<PyObject>> = instruments
208 .into_iter()
209 .map(|result| instrument_any_to_pyobject(py, result))
210 .collect();
211
212 py_results.map(|objs| {
213 PyList::new(py, &objs)
214 .expect("Invalid `ExactSizeIterator`")
215 .into_py_any_unwrap(py)
216 })
217 })
218 })
219 }
220
221 #[pyo3(name = "get_range_quotes")]
222 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
223 #[allow(clippy::too_many_arguments)]
224 fn py_get_range_quotes<'py>(
225 &self,
226 py: Python<'py>,
227 dataset: String,
228 instrument_ids: Vec<InstrumentId>,
229 start: u64,
230 end: Option<u64>,
231 limit: Option<u64>,
232 price_precision: Option<u8>,
233 schema: Option<String>,
234 ) -> PyResult<Bound<'py, PyAny>> {
235 let client = self.inner.clone();
236 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
237 let symbols: Vec<String> = instrument_ids
238 .iter()
239 .map(|instrument_id| {
240 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
241 })
242 .collect();
243
244 let stype_in = infer_symbology_type(symbols.first().unwrap());
245 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
246 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
247 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
248 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
249 let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
250 let dbn_schema = dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?;
251 match dbn_schema {
252 dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
253 _ => {
254 return Err(to_pyvalue_err(
255 "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m",
256 ));
257 }
258 }
259 let params = GetRangeParams::builder()
260 .dataset(dataset)
261 .date_time_range(time_range)
262 .symbols(symbols)
263 .stype_in(stype_in)
264 .schema(dbn_schema)
265 .limit(limit.and_then(NonZeroU64::new))
266 .build();
267
268 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
269 let publisher_venue_map = self.publisher_venue_map.clone();
270 let symbol_venue_map = self.symbol_venue_map.clone();
271 let ts_init = self.clock.get_time_ns();
272
273 pyo3_async_runtimes::tokio::future_into_py(py, async move {
274 let mut client = client.lock().await; let mut decoder = client
276 .timeseries()
277 .get_range(¶ms)
278 .await
279 .map_err(to_pyvalue_err)?;
280
281 let metadata = decoder.metadata().clone();
282 let mut result: Vec<QuoteTick> = Vec::new();
283
284 let mut process_record = |record: dbn::RecordRef| -> PyResult<()> {
285 let instrument_id = decode_nautilus_instrument_id(
286 &record,
287 &metadata,
288 &publisher_venue_map,
289 &symbol_venue_map.read().unwrap(),
290 )
291 .map_err(to_pyvalue_err)?;
292
293 let (data, _) = decode_record(
294 &record,
295 instrument_id,
296 price_precision,
297 Some(ts_init),
298 false, )
300 .map_err(to_pyvalue_err)?;
301
302 match data {
303 Some(Data::Quote(quote)) => {
304 result.push(quote);
305 Ok(())
306 }
307 _ => panic!("Invalid data element not `QuoteTick`, was {data:?}"),
308 }
309 };
310
311 match dbn_schema {
312 dbn::Schema::Mbp1 => {
313 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
314 process_record(dbn::RecordRef::from(msg))?;
315 }
316 }
317 dbn::Schema::Bbo1M => {
318 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
319 process_record(dbn::RecordRef::from(msg))?;
320 }
321 }
322 dbn::Schema::Bbo1S => {
323 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
324 process_record(dbn::RecordRef::from(msg))?;
325 }
326 }
327 _ => panic!("Invalid schema {dbn_schema}"),
328 }
329
330 Python::with_gil(|py| result.into_py_any(py))
331 })
332 }
333
334 #[pyo3(name = "get_range_trades")]
335 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
336 #[allow(clippy::too_many_arguments)]
337 fn py_get_range_trades<'py>(
338 &self,
339 py: Python<'py>,
340 dataset: String,
341 instrument_ids: Vec<InstrumentId>,
342 start: u64,
343 end: Option<u64>,
344 limit: Option<u64>,
345 price_precision: Option<u8>,
346 ) -> PyResult<Bound<'py, PyAny>> {
347 let client = self.inner.clone();
348 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
349 let symbols: Vec<String> = instrument_ids
350 .iter()
351 .map(|instrument_id| {
352 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
353 })
354 .collect();
355
356 let stype_in = infer_symbology_type(symbols.first().unwrap());
357 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
358 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
359 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
360 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
361 let params = GetRangeParams::builder()
362 .dataset(dataset)
363 .date_time_range(time_range)
364 .symbols(symbols)
365 .stype_in(stype_in)
366 .schema(dbn::Schema::Trades)
367 .limit(limit.and_then(NonZeroU64::new))
368 .build();
369
370 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
371 let publisher_venue_map = self.publisher_venue_map.clone();
372 let symbol_venue_map = self.symbol_venue_map.clone();
373 let ts_init = self.clock.get_time_ns();
374
375 pyo3_async_runtimes::tokio::future_into_py(py, async move {
376 let mut client = client.lock().await; let mut decoder = client
378 .timeseries()
379 .get_range(¶ms)
380 .await
381 .map_err(to_pyvalue_err)?;
382
383 let metadata = decoder.metadata().clone();
384 let mut result: Vec<TradeTick> = Vec::new();
385
386 while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
387 let record = dbn::RecordRef::from(msg);
388 let instrument_id = decode_nautilus_instrument_id(
389 &record,
390 &metadata,
391 &publisher_venue_map,
392 &symbol_venue_map.read().unwrap(),
393 )
394 .map_err(to_pyvalue_err)?;
395
396 let (data, _) = decode_record(
397 &record,
398 instrument_id,
399 price_precision,
400 Some(ts_init),
401 false, )
403 .map_err(to_pyvalue_err)?;
404
405 match data {
406 Some(Data::Trade(trade)) => {
407 result.push(trade);
408 }
409 _ => panic!("Invalid data element not `TradeTick`, was {data:?}"),
410 }
411 }
412
413 Python::with_gil(|py| result.into_py_any(py))
414 })
415 }
416
417 #[pyo3(name = "get_range_bars")]
418 #[allow(clippy::too_many_arguments)]
419 #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None))]
420 fn py_get_range_bars<'py>(
421 &self,
422 py: Python<'py>,
423 dataset: String,
424 instrument_ids: Vec<InstrumentId>,
425 aggregation: BarAggregation,
426 start: u64,
427 end: Option<u64>,
428 limit: Option<u64>,
429 price_precision: Option<u8>,
430 ) -> PyResult<Bound<'py, PyAny>> {
431 let client = self.inner.clone();
432 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
433 let symbols: Vec<String> = instrument_ids
434 .iter()
435 .map(|instrument_id| {
436 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
437 })
438 .collect();
439
440 let stype_in = infer_symbology_type(symbols.first().unwrap());
441 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
442 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
443 let schema = match aggregation {
444 BarAggregation::Second => dbn::Schema::Ohlcv1S,
445 BarAggregation::Minute => dbn::Schema::Ohlcv1M,
446 BarAggregation::Hour => dbn::Schema::Ohlcv1H,
447 BarAggregation::Day => dbn::Schema::Ohlcv1D,
448 _ => panic!("Invalid `BarAggregation` for request, was {aggregation}"),
449 };
450 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
451 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
452 let params = GetRangeParams::builder()
453 .dataset(dataset)
454 .date_time_range(time_range)
455 .symbols(symbols)
456 .stype_in(stype_in)
457 .schema(schema)
458 .limit(limit.and_then(NonZeroU64::new))
459 .build();
460
461 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
462 let publisher_venue_map = self.publisher_venue_map.clone();
463 let symbol_venue_map = self.symbol_venue_map.clone();
464 let ts_init = self.clock.get_time_ns();
465
466 pyo3_async_runtimes::tokio::future_into_py(py, async move {
467 let mut client = client.lock().await; let mut decoder = client
469 .timeseries()
470 .get_range(¶ms)
471 .await
472 .map_err(to_pyvalue_err)?;
473
474 let metadata = decoder.metadata().clone();
475 let mut result: Vec<Bar> = Vec::new();
476
477 while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
478 let record = dbn::RecordRef::from(msg);
479 let instrument_id = decode_nautilus_instrument_id(
480 &record,
481 &metadata,
482 &publisher_venue_map,
483 &symbol_venue_map.read().unwrap(),
484 )
485 .map_err(to_pyvalue_err)?;
486
487 let (data, _) = decode_record(
488 &record,
489 instrument_id,
490 price_precision,
491 Some(ts_init),
492 false, )
494 .map_err(to_pyvalue_err)?;
495
496 match data {
497 Some(Data::Bar(bar)) => {
498 result.push(bar);
499 }
500 _ => panic!("Invalid data element not `Bar`, was {data:?}"),
501 }
502 }
503
504 Python::with_gil(|py| result.into_py_any(py))
505 })
506 }
507
508 #[pyo3(name = "get_range_imbalance")]
509 #[allow(clippy::too_many_arguments)]
510 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
511 fn py_get_range_imbalance<'py>(
512 &self,
513 py: Python<'py>,
514 dataset: String,
515 instrument_ids: Vec<InstrumentId>,
516 start: u64,
517 end: Option<u64>,
518 limit: Option<u64>,
519 price_precision: Option<u8>,
520 ) -> PyResult<Bound<'py, PyAny>> {
521 let client = self.inner.clone();
522 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
523 let symbols: Vec<String> = instrument_ids
524 .iter()
525 .map(|instrument_id| {
526 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
527 })
528 .collect();
529
530 let stype_in = infer_symbology_type(symbols.first().unwrap());
531 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
532 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
533 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
534 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
535 let params = GetRangeParams::builder()
536 .dataset(dataset)
537 .date_time_range(time_range)
538 .symbols(symbols)
539 .stype_in(stype_in)
540 .schema(dbn::Schema::Imbalance)
541 .limit(limit.and_then(NonZeroU64::new))
542 .build();
543
544 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
545 let publisher_venue_map = self.publisher_venue_map.clone();
546 let symbol_venue_map = self.symbol_venue_map.clone();
547 let ts_init = self.clock.get_time_ns();
548
549 pyo3_async_runtimes::tokio::future_into_py(py, async move {
550 let mut client = client.lock().await; let mut decoder = client
552 .timeseries()
553 .get_range(¶ms)
554 .await
555 .map_err(to_pyvalue_err)?;
556
557 let metadata = decoder.metadata().clone();
558 let mut result: Vec<DatabentoImbalance> = Vec::new();
559
560 while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
561 let record = dbn::RecordRef::from(msg);
562 let instrument_id = decode_nautilus_instrument_id(
563 &record,
564 &metadata,
565 &publisher_venue_map,
566 &symbol_venue_map.read().unwrap(),
567 )
568 .map_err(to_pyvalue_err)?;
569
570 let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, ts_init)
571 .map_err(to_pyvalue_err)?;
572
573 result.push(imbalance);
574 }
575
576 Python::with_gil(|py| result.into_py_any(py))
577 })
578 }
579
580 #[pyo3(name = "get_range_statistics")]
581 #[allow(clippy::too_many_arguments)]
582 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
583 fn py_get_range_statistics<'py>(
584 &self,
585 py: Python<'py>,
586 dataset: String,
587 instrument_ids: Vec<InstrumentId>,
588 start: u64,
589 end: Option<u64>,
590 limit: Option<u64>,
591 price_precision: Option<u8>,
592 ) -> PyResult<Bound<'py, PyAny>> {
593 let client = self.inner.clone();
594 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
595 let symbols: Vec<String> = instrument_ids
596 .iter()
597 .map(|instrument_id| {
598 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
599 })
600 .collect();
601
602 let stype_in = infer_symbology_type(symbols.first().unwrap());
603 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
604 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
605 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
606 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
607 let params = GetRangeParams::builder()
608 .dataset(dataset)
609 .date_time_range(time_range)
610 .symbols(symbols)
611 .stype_in(stype_in)
612 .schema(dbn::Schema::Statistics)
613 .limit(limit.and_then(NonZeroU64::new))
614 .build();
615
616 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
617 let publisher_venue_map = self.publisher_venue_map.clone();
618 let symbol_venue_map = self.symbol_venue_map.clone();
619 let ts_init = self.clock.get_time_ns();
620
621 pyo3_async_runtimes::tokio::future_into_py(py, async move {
622 let mut client = client.lock().await; let mut decoder = client
624 .timeseries()
625 .get_range(¶ms)
626 .await
627 .map_err(to_pyvalue_err)?;
628
629 let metadata = decoder.metadata().clone();
630 let mut result: Vec<DatabentoStatistics> = Vec::new();
631
632 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
633 let record = dbn::RecordRef::from(msg);
634 let instrument_id = decode_nautilus_instrument_id(
635 &record,
636 &metadata,
637 &publisher_venue_map,
638 &symbol_venue_map.read().unwrap(),
639 )
640 .map_err(to_pyvalue_err)?;
641
642 let statistics =
643 decode_statistics_msg(msg, instrument_id, price_precision, ts_init)
644 .map_err(to_pyvalue_err)?;
645
646 result.push(statistics);
647 }
648
649 Python::with_gil(|py| result.into_py_any(py))
650 })
651 }
652
653 #[pyo3(name = "get_range_status")]
654 #[allow(clippy::too_many_arguments)]
655 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
656 fn py_get_range_status<'py>(
657 &self,
658 py: Python<'py>,
659 dataset: String,
660 instrument_ids: Vec<InstrumentId>,
661 start: u64,
662 end: Option<u64>,
663 limit: Option<u64>,
664 ) -> PyResult<Bound<'py, PyAny>> {
665 let client = self.inner.clone();
666 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
667 let symbols: Vec<String> = instrument_ids
668 .iter()
669 .map(|instrument_id| {
670 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
671 })
672 .collect();
673
674 let stype_in = infer_symbology_type(symbols.first().unwrap());
675 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
676 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
677 let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
678 let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
679 let params = GetRangeParams::builder()
680 .dataset(dataset)
681 .date_time_range(time_range)
682 .symbols(symbols)
683 .stype_in(stype_in)
684 .schema(dbn::Schema::Status)
685 .limit(limit.and_then(NonZeroU64::new))
686 .build();
687
688 let publisher_venue_map = self.publisher_venue_map.clone();
689 let ts_init = self.clock.get_time_ns();
690 let symbol_venue_map = self.symbol_venue_map.clone();
691
692 pyo3_async_runtimes::tokio::future_into_py(py, async move {
693 let mut client = client.lock().await; let mut decoder = client
695 .timeseries()
696 .get_range(¶ms)
697 .await
698 .map_err(to_pyvalue_err)?;
699
700 let metadata = decoder.metadata().clone();
701 let mut result: Vec<InstrumentStatus> = Vec::new();
702
703 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
704 let record = dbn::RecordRef::from(msg);
705 let instrument_id = decode_nautilus_instrument_id(
706 &record,
707 &metadata,
708 &publisher_venue_map,
709 &symbol_venue_map.read().unwrap(),
710 )
711 .map_err(to_pyvalue_err)?;
712
713 let status =
714 decode_status_msg(msg, instrument_id, ts_init).map_err(to_pyvalue_err)?;
715
716 result.push(status);
717 }
718
719 Python::with_gil(|py| result.into_py_any(py))
720 })
721 }
722}