1use std::path::PathBuf;
19
20use nautilus_core::{
21 python::{IntoPyObjectNautilusExt, to_pyexception, to_pyvalue_err},
22 time::get_atomic_clock_realtime,
23};
24use nautilus_model::{
25 enums::BarAggregation, identifiers::InstrumentId,
26 python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29 IntoPyObjectExt,
30 prelude::*,
31 types::{PyDict, PyList},
32};
33
34use crate::historical::{
35 DatabentoHistoricalClient as CoreDatabentoHistoricalClient, RangeQueryParams,
36};
37
38#[cfg_attr(
40 feature = "python",
41 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
42)]
43#[derive(Debug)]
44pub struct DatabentoHistoricalClient {
45 #[pyo3(get)]
46 pub key: String,
47 inner: CoreDatabentoHistoricalClient,
48}
49
50#[pymethods]
51impl DatabentoHistoricalClient {
52 #[new]
53 fn py_new(
54 key: String,
55 publishers_filepath: PathBuf,
56 use_exchange_as_venue: bool,
57 ) -> PyResult<Self> {
58 let clock = get_atomic_clock_realtime();
59 let inner = CoreDatabentoHistoricalClient::new(
60 key.clone(),
61 publishers_filepath,
62 clock,
63 use_exchange_as_venue,
64 )
65 .map_err(to_pyvalue_err)?;
66
67 Ok(Self { key, inner })
68 }
69
70 #[pyo3(name = "get_dataset_range")]
71 fn py_get_dataset_range<'py>(
72 &self,
73 py: Python<'py>,
74 dataset: String,
75 ) -> PyResult<Bound<'py, PyAny>> {
76 let inner = self.inner.clone();
77
78 pyo3_async_runtimes::tokio::future_into_py(py, async move {
79 let response = inner.get_dataset_range(&dataset).await;
80 match response {
81 Ok(res) => Python::attach(|py| {
82 let dict = PyDict::new(py);
83 dict.set_item("start", res.start)?;
84 dict.set_item("end", res.end)?;
85 dict.into_py_any(py)
86 }),
87 Err(e) => Err(to_pyexception(format!("Error handling response: {e}"))),
88 }
89 })
90 }
91
92 #[pyo3(name = "get_range_instruments")]
93 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
94 #[allow(clippy::too_many_arguments)]
95 fn py_get_range_instruments<'py>(
96 &self,
97 py: Python<'py>,
98 dataset: String,
99 instrument_ids: Vec<InstrumentId>,
100 start: u64,
101 end: Option<u64>,
102 limit: Option<u64>,
103 ) -> PyResult<Bound<'py, PyAny>> {
104 let inner = self.inner.clone();
105 let symbols = inner
106 .prepare_symbols_from_instrument_ids(&instrument_ids)
107 .map_err(to_pyvalue_err)?;
108
109 let params = RangeQueryParams {
110 dataset,
111 symbols,
112 start: start.into(),
113 end: end.map(Into::into),
114 limit,
115 price_precision: None,
116 };
117
118 pyo3_async_runtimes::tokio::future_into_py(py, async move {
119 let instruments = inner
120 .get_range_instruments(params)
121 .await
122 .map_err(to_pyvalue_err)?;
123
124 Python::attach(|py| -> PyResult<Py<PyAny>> {
125 let objs: Vec<Py<PyAny>> = instruments
126 .into_iter()
127 .map(|inst| instrument_any_to_pyobject(py, inst))
128 .collect::<PyResult<Vec<Py<PyAny>>>>()?;
129
130 let list = PyList::new(py, &objs).expect("Invalid `ExactSizeIterator`");
131 Ok(list.into_py_any_unwrap(py))
132 })
133 })
134 }
135
136 #[pyo3(name = "get_range_quotes")]
137 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
138 #[allow(clippy::too_many_arguments)]
139 fn py_get_range_quotes<'py>(
140 &self,
141 py: Python<'py>,
142 dataset: String,
143 instrument_ids: Vec<InstrumentId>,
144 start: u64,
145 end: Option<u64>,
146 limit: Option<u64>,
147 price_precision: Option<u8>,
148 schema: Option<String>,
149 ) -> PyResult<Bound<'py, PyAny>> {
150 let inner = self.inner.clone();
151 let symbols = inner
152 .prepare_symbols_from_instrument_ids(&instrument_ids)
153 .map_err(to_pyvalue_err)?;
154
155 let params = RangeQueryParams {
156 dataset,
157 symbols,
158 start: start.into(),
159 end: end.map(Into::into),
160 limit,
161 price_precision,
162 };
163
164 pyo3_async_runtimes::tokio::future_into_py(py, async move {
165 let quotes = inner
166 .get_range_quotes(params, schema)
167 .await
168 .map_err(to_pyvalue_err)?;
169 Python::attach(|py| quotes.into_py_any(py))
170 })
171 }
172
173 #[pyo3(name = "get_range_trades")]
174 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
175 #[allow(clippy::too_many_arguments)]
176 fn py_get_range_trades<'py>(
177 &self,
178 py: Python<'py>,
179 dataset: String,
180 instrument_ids: Vec<InstrumentId>,
181 start: u64,
182 end: Option<u64>,
183 limit: Option<u64>,
184 price_precision: Option<u8>,
185 ) -> PyResult<Bound<'py, PyAny>> {
186 let inner = self.inner.clone();
187 let symbols = inner
188 .prepare_symbols_from_instrument_ids(&instrument_ids)
189 .map_err(to_pyvalue_err)?;
190
191 let params = RangeQueryParams {
192 dataset,
193 symbols,
194 start: start.into(),
195 end: end.map(Into::into),
196 limit,
197 price_precision,
198 };
199
200 pyo3_async_runtimes::tokio::future_into_py(py, async move {
201 let trades = inner
202 .get_range_trades(params)
203 .await
204 .map_err(to_pyvalue_err)?;
205 Python::attach(|py| trades.into_py_any(py))
206 })
207 }
208
209 #[pyo3(name = "get_range_bars")]
210 #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
211 #[allow(clippy::too_many_arguments)]
212 fn py_get_range_bars<'py>(
213 &self,
214 py: Python<'py>,
215 dataset: String,
216 instrument_ids: Vec<InstrumentId>,
217 aggregation: BarAggregation,
218 start: u64,
219 end: Option<u64>,
220 limit: Option<u64>,
221 price_precision: Option<u8>,
222 timestamp_on_close: bool,
223 ) -> PyResult<Bound<'py, PyAny>> {
224 let inner = self.inner.clone();
225 let symbols = inner
226 .prepare_symbols_from_instrument_ids(&instrument_ids)
227 .map_err(to_pyvalue_err)?;
228
229 let params = RangeQueryParams {
230 dataset,
231 symbols,
232 start: start.into(),
233 end: end.map(Into::into),
234 limit,
235 price_precision,
236 };
237
238 pyo3_async_runtimes::tokio::future_into_py(py, async move {
239 let bars = inner
240 .get_range_bars(params, aggregation, timestamp_on_close)
241 .await
242 .map_err(to_pyvalue_err)?;
243 Python::attach(|py| bars.into_py_any(py))
244 })
245 }
246
247 #[pyo3(name = "get_order_book_depth10")]
248 #[pyo3(signature = (dataset, instrument_ids, start, end=None, depth=None))]
249 #[allow(clippy::too_many_arguments)]
250 fn py_get_order_book_depth10<'py>(
251 &self,
252 py: Python<'py>,
253 dataset: String,
254 instrument_ids: Vec<InstrumentId>,
255 start: u64,
256 end: Option<u64>,
257 depth: Option<usize>,
258 ) -> PyResult<Bound<'py, PyAny>> {
259 let inner = self.inner.clone();
260 let symbols = inner
261 .prepare_symbols_from_instrument_ids(&instrument_ids)
262 .map_err(to_pyvalue_err)?;
263
264 let params = RangeQueryParams {
265 dataset,
266 symbols,
267 start: start.into(),
268 end: end.map(Into::into),
269 limit: None,
270 price_precision: None,
271 };
272
273 pyo3_async_runtimes::tokio::future_into_py(py, async move {
274 let depths = inner
275 .get_range_order_book_depth10(params, depth)
276 .await
277 .map_err(to_pyvalue_err)?;
278 Python::attach(|py| depths.into_py_any(py))
279 })
280 }
281
282 #[pyo3(name = "get_range_order_book_deltas")]
283 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
284 #[allow(clippy::too_many_arguments)]
285 fn py_get_range_order_book_deltas<'py>(
286 &self,
287 py: Python<'py>,
288 dataset: String,
289 instrument_ids: Vec<InstrumentId>,
290 start: u64,
291 end: Option<u64>,
292 limit: Option<u64>,
293 price_precision: Option<u8>,
294 ) -> PyResult<Bound<'py, PyAny>> {
295 let inner = self.inner.clone();
296 let symbols = inner
297 .prepare_symbols_from_instrument_ids(&instrument_ids)
298 .map_err(to_pyvalue_err)?;
299
300 let params = RangeQueryParams {
301 dataset,
302 symbols,
303 start: start.into(),
304 end: end.map(Into::into),
305 limit,
306 price_precision,
307 };
308
309 pyo3_async_runtimes::tokio::future_into_py(py, async move {
310 let deltas = inner
311 .get_range_order_book_deltas(params)
312 .await
313 .map_err(to_pyvalue_err)?;
314 Python::attach(|py| deltas.into_py_any(py))
315 })
316 }
317
318 #[pyo3(name = "get_range_imbalance")]
319 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
320 #[allow(clippy::too_many_arguments)]
321 fn py_get_range_imbalance<'py>(
322 &self,
323 py: Python<'py>,
324 dataset: String,
325 instrument_ids: Vec<InstrumentId>,
326 start: u64,
327 end: Option<u64>,
328 limit: Option<u64>,
329 price_precision: Option<u8>,
330 ) -> PyResult<Bound<'py, PyAny>> {
331 let inner = self.inner.clone();
332 let symbols = inner
333 .prepare_symbols_from_instrument_ids(&instrument_ids)
334 .map_err(to_pyvalue_err)?;
335
336 let params = RangeQueryParams {
337 dataset,
338 symbols,
339 start: start.into(),
340 end: end.map(Into::into),
341 limit,
342 price_precision,
343 };
344
345 pyo3_async_runtimes::tokio::future_into_py(py, async move {
346 let imbalances = inner
347 .get_range_imbalance(params)
348 .await
349 .map_err(to_pyvalue_err)?;
350 Python::attach(|py| imbalances.into_py_any(py))
351 })
352 }
353
354 #[pyo3(name = "get_range_statistics")]
355 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
356 #[allow(clippy::too_many_arguments)]
357 fn py_get_range_statistics<'py>(
358 &self,
359 py: Python<'py>,
360 dataset: String,
361 instrument_ids: Vec<InstrumentId>,
362 start: u64,
363 end: Option<u64>,
364 limit: Option<u64>,
365 price_precision: Option<u8>,
366 ) -> PyResult<Bound<'py, PyAny>> {
367 let inner = self.inner.clone();
368 let symbols = inner
369 .prepare_symbols_from_instrument_ids(&instrument_ids)
370 .map_err(to_pyvalue_err)?;
371
372 let params = RangeQueryParams {
373 dataset,
374 symbols,
375 start: start.into(),
376 end: end.map(Into::into),
377 limit,
378 price_precision,
379 };
380
381 pyo3_async_runtimes::tokio::future_into_py(py, async move {
382 let statistics = inner
383 .get_range_statistics(params)
384 .await
385 .map_err(to_pyvalue_err)?;
386 Python::attach(|py| statistics.into_py_any(py))
387 })
388 }
389
390 #[pyo3(name = "get_range_status")]
391 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
392 #[allow(clippy::too_many_arguments)]
393 fn py_get_range_status<'py>(
394 &self,
395 py: Python<'py>,
396 dataset: String,
397 instrument_ids: Vec<InstrumentId>,
398 start: u64,
399 end: Option<u64>,
400 limit: Option<u64>,
401 ) -> PyResult<Bound<'py, PyAny>> {
402 let inner = self.inner.clone();
403 let symbols = inner
404 .prepare_symbols_from_instrument_ids(&instrument_ids)
405 .map_err(to_pyvalue_err)?;
406
407 let params = RangeQueryParams {
408 dataset,
409 symbols,
410 start: start.into(),
411 end: end.map(Into::into),
412 limit,
413 price_precision: None,
414 };
415
416 pyo3_async_runtimes::tokio::future_into_py(py, async move {
417 let statuses = inner
418 .get_range_status(params)
419 .await
420 .map_err(to_pyvalue_err)?;
421 Python::attach(|py| statuses.into_py_any(py))
422 })
423 }
424}