1use std::path::PathBuf;
19
20use nautilus_core::{
21 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
22 time::get_atomic_clock_realtime,
23};
24use nautilus_model::{
25 enums::BarAggregation, identifiers::InstrumentId,
26 python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29 IntoPyObjectExt,
30 exceptions::PyException,
31 prelude::*,
32 types::{PyDict, PyList},
33};
34
35use crate::historical::{
36 DatabentoHistoricalClient as CoreDatabentoHistoricalClient, RangeQueryParams,
37};
38
39#[cfg_attr(
41 feature = "python",
42 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
43)]
44#[derive(Debug)]
45pub struct DatabentoHistoricalClient {
46 #[pyo3(get)]
47 pub key: String,
48 inner: CoreDatabentoHistoricalClient,
49}
50
51#[pymethods]
52impl DatabentoHistoricalClient {
53 #[new]
54 fn py_new(
55 key: String,
56 publishers_filepath: PathBuf,
57 use_exchange_as_venue: bool,
58 ) -> PyResult<Self> {
59 let clock = get_atomic_clock_realtime();
60 let inner = CoreDatabentoHistoricalClient::new(
61 key.clone(),
62 publishers_filepath,
63 clock,
64 use_exchange_as_venue,
65 )
66 .map_err(to_pyvalue_err)?;
67
68 Ok(Self { key, inner })
69 }
70
71 #[pyo3(name = "get_dataset_range")]
72 fn py_get_dataset_range<'py>(
73 &self,
74 py: Python<'py>,
75 dataset: String,
76 ) -> PyResult<Bound<'py, PyAny>> {
77 let inner = self.inner.clone();
78
79 pyo3_async_runtimes::tokio::future_into_py(py, async move {
80 let response = inner.get_dataset_range(&dataset).await;
81 match response {
82 Ok(res) => Python::attach(|py| {
83 let dict = PyDict::new(py);
84 dict.set_item("start", res.start)?;
85 dict.set_item("end", res.end)?;
86 dict.into_py_any(py)
87 }),
88 Err(e) => Err(PyErr::new::<PyException, _>(format!(
89 "Error handling response: {e}"
90 ))),
91 }
92 })
93 }
94
95 #[pyo3(name = "get_range_instruments")]
96 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
97 #[allow(clippy::too_many_arguments)]
98 fn py_get_range_instruments<'py>(
99 &self,
100 py: Python<'py>,
101 dataset: String,
102 instrument_ids: Vec<InstrumentId>,
103 start: u64,
104 end: Option<u64>,
105 limit: Option<u64>,
106 ) -> PyResult<Bound<'py, PyAny>> {
107 let inner = self.inner.clone();
108 let symbols = inner
109 .prepare_symbols_from_instrument_ids(&instrument_ids)
110 .map_err(to_pyvalue_err)?;
111
112 let params = RangeQueryParams {
113 dataset,
114 symbols,
115 start: start.into(),
116 end: end.map(Into::into),
117 limit,
118 price_precision: None,
119 };
120
121 pyo3_async_runtimes::tokio::future_into_py(py, async move {
122 let instruments = inner
123 .get_range_instruments(params)
124 .await
125 .map_err(to_pyvalue_err)?;
126
127 Python::attach(|py| -> PyResult<Py<PyAny>> {
128 let objs: Vec<Py<PyAny>> = instruments
129 .into_iter()
130 .map(|inst| instrument_any_to_pyobject(py, inst))
131 .collect::<PyResult<Vec<Py<PyAny>>>>()?;
132
133 let list = PyList::new(py, &objs).expect("Invalid `ExactSizeIterator`");
134 Ok(list.into_py_any_unwrap(py))
135 })
136 })
137 }
138
139 #[pyo3(name = "get_range_quotes")]
140 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
141 #[allow(clippy::too_many_arguments)]
142 fn py_get_range_quotes<'py>(
143 &self,
144 py: Python<'py>,
145 dataset: String,
146 instrument_ids: Vec<InstrumentId>,
147 start: u64,
148 end: Option<u64>,
149 limit: Option<u64>,
150 price_precision: Option<u8>,
151 schema: Option<String>,
152 ) -> PyResult<Bound<'py, PyAny>> {
153 let inner = self.inner.clone();
154 let symbols = inner
155 .prepare_symbols_from_instrument_ids(&instrument_ids)
156 .map_err(to_pyvalue_err)?;
157
158 let params = RangeQueryParams {
159 dataset,
160 symbols,
161 start: start.into(),
162 end: end.map(Into::into),
163 limit,
164 price_precision,
165 };
166
167 pyo3_async_runtimes::tokio::future_into_py(py, async move {
168 let quotes = inner
169 .get_range_quotes(params, schema)
170 .await
171 .map_err(to_pyvalue_err)?;
172 Python::attach(|py| quotes.into_py_any(py))
173 })
174 }
175
176 #[pyo3(name = "get_range_trades")]
177 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
178 #[allow(clippy::too_many_arguments)]
179 fn py_get_range_trades<'py>(
180 &self,
181 py: Python<'py>,
182 dataset: String,
183 instrument_ids: Vec<InstrumentId>,
184 start: u64,
185 end: Option<u64>,
186 limit: Option<u64>,
187 price_precision: Option<u8>,
188 ) -> PyResult<Bound<'py, PyAny>> {
189 let inner = self.inner.clone();
190 let symbols = inner
191 .prepare_symbols_from_instrument_ids(&instrument_ids)
192 .map_err(to_pyvalue_err)?;
193
194 let params = RangeQueryParams {
195 dataset,
196 symbols,
197 start: start.into(),
198 end: end.map(Into::into),
199 limit,
200 price_precision,
201 };
202
203 pyo3_async_runtimes::tokio::future_into_py(py, async move {
204 let trades = inner
205 .get_range_trades(params)
206 .await
207 .map_err(to_pyvalue_err)?;
208 Python::attach(|py| trades.into_py_any(py))
209 })
210 }
211
212 #[pyo3(name = "get_range_bars")]
213 #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
214 #[allow(clippy::too_many_arguments)]
215 fn py_get_range_bars<'py>(
216 &self,
217 py: Python<'py>,
218 dataset: String,
219 instrument_ids: Vec<InstrumentId>,
220 aggregation: BarAggregation,
221 start: u64,
222 end: Option<u64>,
223 limit: Option<u64>,
224 price_precision: Option<u8>,
225 timestamp_on_close: bool,
226 ) -> PyResult<Bound<'py, PyAny>> {
227 let inner = self.inner.clone();
228 let symbols = inner
229 .prepare_symbols_from_instrument_ids(&instrument_ids)
230 .map_err(to_pyvalue_err)?;
231
232 let params = RangeQueryParams {
233 dataset,
234 symbols,
235 start: start.into(),
236 end: end.map(Into::into),
237 limit,
238 price_precision,
239 };
240
241 pyo3_async_runtimes::tokio::future_into_py(py, async move {
242 let bars = inner
243 .get_range_bars(params, aggregation, timestamp_on_close)
244 .await
245 .map_err(to_pyvalue_err)?;
246 Python::attach(|py| bars.into_py_any(py))
247 })
248 }
249
250 #[pyo3(name = "get_order_book_depth10")]
251 #[pyo3(signature = (dataset, instrument_ids, start, end=None, depth=None))]
252 #[allow(clippy::too_many_arguments)]
253 fn py_get_order_book_depth10<'py>(
254 &self,
255 py: Python<'py>,
256 dataset: String,
257 instrument_ids: Vec<InstrumentId>,
258 start: u64,
259 end: Option<u64>,
260 depth: Option<usize>,
261 ) -> PyResult<Bound<'py, PyAny>> {
262 let inner = self.inner.clone();
263 let symbols = inner
264 .prepare_symbols_from_instrument_ids(&instrument_ids)
265 .map_err(to_pyvalue_err)?;
266
267 let params = RangeQueryParams {
268 dataset,
269 symbols,
270 start: start.into(),
271 end: end.map(Into::into),
272 limit: None,
273 price_precision: None,
274 };
275
276 pyo3_async_runtimes::tokio::future_into_py(py, async move {
277 let depths = inner
278 .get_range_order_book_depth10(params, depth)
279 .await
280 .map_err(to_pyvalue_err)?;
281 Python::attach(|py| depths.into_py_any(py))
282 })
283 }
284
285 #[pyo3(name = "get_range_imbalance")]
286 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
287 #[allow(clippy::too_many_arguments)]
288 fn py_get_range_imbalance<'py>(
289 &self,
290 py: Python<'py>,
291 dataset: String,
292 instrument_ids: Vec<InstrumentId>,
293 start: u64,
294 end: Option<u64>,
295 limit: Option<u64>,
296 price_precision: Option<u8>,
297 ) -> PyResult<Bound<'py, PyAny>> {
298 let inner = self.inner.clone();
299 let symbols = inner
300 .prepare_symbols_from_instrument_ids(&instrument_ids)
301 .map_err(to_pyvalue_err)?;
302
303 let params = RangeQueryParams {
304 dataset,
305 symbols,
306 start: start.into(),
307 end: end.map(Into::into),
308 limit,
309 price_precision,
310 };
311
312 pyo3_async_runtimes::tokio::future_into_py(py, async move {
313 let imbalances = inner
314 .get_range_imbalance(params)
315 .await
316 .map_err(to_pyvalue_err)?;
317 Python::attach(|py| imbalances.into_py_any(py))
318 })
319 }
320
321 #[pyo3(name = "get_range_statistics")]
322 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
323 #[allow(clippy::too_many_arguments)]
324 fn py_get_range_statistics<'py>(
325 &self,
326 py: Python<'py>,
327 dataset: String,
328 instrument_ids: Vec<InstrumentId>,
329 start: u64,
330 end: Option<u64>,
331 limit: Option<u64>,
332 price_precision: Option<u8>,
333 ) -> PyResult<Bound<'py, PyAny>> {
334 let inner = self.inner.clone();
335 let symbols = inner
336 .prepare_symbols_from_instrument_ids(&instrument_ids)
337 .map_err(to_pyvalue_err)?;
338
339 let params = RangeQueryParams {
340 dataset,
341 symbols,
342 start: start.into(),
343 end: end.map(Into::into),
344 limit,
345 price_precision,
346 };
347
348 pyo3_async_runtimes::tokio::future_into_py(py, async move {
349 let statistics = inner
350 .get_range_statistics(params)
351 .await
352 .map_err(to_pyvalue_err)?;
353 Python::attach(|py| statistics.into_py_any(py))
354 })
355 }
356
357 #[pyo3(name = "get_range_status")]
358 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
359 #[allow(clippy::too_many_arguments)]
360 fn py_get_range_status<'py>(
361 &self,
362 py: Python<'py>,
363 dataset: String,
364 instrument_ids: Vec<InstrumentId>,
365 start: u64,
366 end: Option<u64>,
367 limit: Option<u64>,
368 ) -> PyResult<Bound<'py, PyAny>> {
369 let inner = self.inner.clone();
370 let symbols = inner
371 .prepare_symbols_from_instrument_ids(&instrument_ids)
372 .map_err(to_pyvalue_err)?;
373
374 let params = RangeQueryParams {
375 dataset,
376 symbols,
377 start: start.into(),
378 end: end.map(Into::into),
379 limit,
380 price_precision: None,
381 };
382
383 pyo3_async_runtimes::tokio::future_into_py(py, async move {
384 let statuses = inner
385 .get_range_status(params)
386 .await
387 .map_err(to_pyvalue_err)?;
388 Python::attach(|py| statuses.into_py_any(py))
389 })
390 }
391}