1use std::path::PathBuf;
19
20use nautilus_core::{
21 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
22 time::get_atomic_clock_realtime,
23};
24use nautilus_model::{
25 enums::BarAggregation, identifiers::InstrumentId,
26 python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29 IntoPyObjectExt,
30 exceptions::PyException,
31 prelude::*,
32 types::{PyDict, PyList},
33};
34
35use crate::historical::{
36 DatabentoHistoricalClient as CoreDatabentoHistoricalClient, RangeQueryParams,
37};
38
39#[cfg_attr(
41 feature = "python",
42 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
43)]
44#[derive(Debug)]
45pub struct DatabentoHistoricalClient {
46 #[pyo3(get)]
47 pub key: String,
48 inner: CoreDatabentoHistoricalClient,
49}
50
51#[pymethods]
52impl DatabentoHistoricalClient {
53 #[new]
54 fn py_new(
55 key: String,
56 publishers_filepath: PathBuf,
57 use_exchange_as_venue: bool,
58 ) -> PyResult<Self> {
59 let clock = get_atomic_clock_realtime();
60 let inner = CoreDatabentoHistoricalClient::new(
61 key.clone(),
62 publishers_filepath,
63 clock,
64 use_exchange_as_venue,
65 )
66 .map_err(to_pyvalue_err)?;
67
68 Ok(Self { key, inner })
69 }
70
71 #[pyo3(name = "get_dataset_range")]
72 fn py_get_dataset_range<'py>(
73 &self,
74 py: Python<'py>,
75 dataset: String,
76 ) -> PyResult<Bound<'py, PyAny>> {
77 let inner = self.inner.clone();
78
79 pyo3_async_runtimes::tokio::future_into_py(py, async move {
80 let response = inner.get_dataset_range(&dataset).await;
81 match response {
82 Ok(res) => Python::with_gil(|py| {
83 let dict = PyDict::new(py);
84 dict.set_item("start", res.start)?;
85 dict.set_item("end", res.end)?;
86 dict.into_py_any(py)
87 }),
88 Err(e) => Err(PyErr::new::<PyException, _>(format!(
89 "Error handling response: {e}"
90 ))),
91 }
92 })
93 }
94
95 #[pyo3(name = "get_range_instruments")]
96 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
97 #[allow(clippy::too_many_arguments)]
98 fn py_get_range_instruments<'py>(
99 &self,
100 py: Python<'py>,
101 dataset: String,
102 instrument_ids: Vec<InstrumentId>,
103 start: u64,
104 end: Option<u64>,
105 limit: Option<u64>,
106 ) -> PyResult<Bound<'py, PyAny>> {
107 let inner = self.inner.clone();
108 let symbols = inner
109 .prepare_symbols_from_instrument_ids(&instrument_ids)
110 .map_err(to_pyvalue_err)?;
111
112 let params = RangeQueryParams {
113 dataset,
114 symbols,
115 start: start.into(),
116 end: end.map(Into::into),
117 limit,
118 price_precision: None,
119 };
120
121 pyo3_async_runtimes::tokio::future_into_py(py, async move {
122 let instruments = inner
123 .get_range_instruments(params)
124 .await
125 .map_err(to_pyvalue_err)?;
126
127 Python::with_gil(|py| -> PyResult<PyObject> {
128 let objs: Vec<PyObject> = instruments
129 .into_iter()
130 .map(|inst| instrument_any_to_pyobject(py, inst))
131 .collect::<PyResult<Vec<PyObject>>>()?;
132
133 let list = PyList::new(py, &objs).expect("Invalid `ExactSizeIterator`");
134 Ok(list.into_py_any_unwrap(py))
135 })
136 })
137 }
138
139 #[pyo3(name = "get_range_quotes")]
140 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
141 #[allow(clippy::too_many_arguments)]
142 fn py_get_range_quotes<'py>(
143 &self,
144 py: Python<'py>,
145 dataset: String,
146 instrument_ids: Vec<InstrumentId>,
147 start: u64,
148 end: Option<u64>,
149 limit: Option<u64>,
150 price_precision: Option<u8>,
151 schema: Option<String>,
152 ) -> PyResult<Bound<'py, PyAny>> {
153 let inner = self.inner.clone();
154 let symbols = inner
155 .prepare_symbols_from_instrument_ids(&instrument_ids)
156 .map_err(to_pyvalue_err)?;
157
158 let params = RangeQueryParams {
159 dataset,
160 symbols,
161 start: start.into(),
162 end: end.map(Into::into),
163 limit,
164 price_precision,
165 };
166
167 pyo3_async_runtimes::tokio::future_into_py(py, async move {
168 let quotes = inner
169 .get_range_quotes(params, schema)
170 .await
171 .map_err(to_pyvalue_err)?;
172 Python::with_gil(|py| quotes.into_py_any(py))
173 })
174 }
175
176 #[pyo3(name = "get_range_trades")]
177 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
178 #[allow(clippy::too_many_arguments)]
179 fn py_get_range_trades<'py>(
180 &self,
181 py: Python<'py>,
182 dataset: String,
183 instrument_ids: Vec<InstrumentId>,
184 start: u64,
185 end: Option<u64>,
186 limit: Option<u64>,
187 price_precision: Option<u8>,
188 ) -> PyResult<Bound<'py, PyAny>> {
189 let inner = self.inner.clone();
190 let symbols = inner
191 .prepare_symbols_from_instrument_ids(&instrument_ids)
192 .map_err(to_pyvalue_err)?;
193
194 let params = RangeQueryParams {
195 dataset,
196 symbols,
197 start: start.into(),
198 end: end.map(Into::into),
199 limit,
200 price_precision,
201 };
202
203 pyo3_async_runtimes::tokio::future_into_py(py, async move {
204 let trades = inner
205 .get_range_trades(params)
206 .await
207 .map_err(to_pyvalue_err)?;
208 Python::with_gil(|py| trades.into_py_any(py))
209 })
210 }
211
212 #[pyo3(name = "get_range_bars")]
213 #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
214 #[allow(clippy::too_many_arguments)]
215 fn py_get_range_bars<'py>(
216 &self,
217 py: Python<'py>,
218 dataset: String,
219 instrument_ids: Vec<InstrumentId>,
220 aggregation: BarAggregation,
221 start: u64,
222 end: Option<u64>,
223 limit: Option<u64>,
224 price_precision: Option<u8>,
225 timestamp_on_close: bool,
226 ) -> PyResult<Bound<'py, PyAny>> {
227 let inner = self.inner.clone();
228 let symbols = inner
229 .prepare_symbols_from_instrument_ids(&instrument_ids)
230 .map_err(to_pyvalue_err)?;
231
232 let params = RangeQueryParams {
233 dataset,
234 symbols,
235 start: start.into(),
236 end: end.map(Into::into),
237 limit,
238 price_precision,
239 };
240
241 pyo3_async_runtimes::tokio::future_into_py(py, async move {
242 let bars = inner
243 .get_range_bars(params, aggregation, timestamp_on_close)
244 .await
245 .map_err(to_pyvalue_err)?;
246 Python::with_gil(|py| bars.into_py_any(py))
247 })
248 }
249
250 #[pyo3(name = "get_range_imbalance")]
251 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
252 #[allow(clippy::too_many_arguments)]
253 fn py_get_range_imbalance<'py>(
254 &self,
255 py: Python<'py>,
256 dataset: String,
257 instrument_ids: Vec<InstrumentId>,
258 start: u64,
259 end: Option<u64>,
260 limit: Option<u64>,
261 price_precision: Option<u8>,
262 ) -> PyResult<Bound<'py, PyAny>> {
263 let inner = self.inner.clone();
264 let symbols = inner
265 .prepare_symbols_from_instrument_ids(&instrument_ids)
266 .map_err(to_pyvalue_err)?;
267
268 let params = RangeQueryParams {
269 dataset,
270 symbols,
271 start: start.into(),
272 end: end.map(Into::into),
273 limit,
274 price_precision,
275 };
276
277 pyo3_async_runtimes::tokio::future_into_py(py, async move {
278 let imbalances = inner
279 .get_range_imbalance(params)
280 .await
281 .map_err(to_pyvalue_err)?;
282 Python::with_gil(|py| imbalances.into_py_any(py))
283 })
284 }
285
286 #[pyo3(name = "get_range_statistics")]
287 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
288 #[allow(clippy::too_many_arguments)]
289 fn py_get_range_statistics<'py>(
290 &self,
291 py: Python<'py>,
292 dataset: String,
293 instrument_ids: Vec<InstrumentId>,
294 start: u64,
295 end: Option<u64>,
296 limit: Option<u64>,
297 price_precision: Option<u8>,
298 ) -> PyResult<Bound<'py, PyAny>> {
299 let inner = self.inner.clone();
300 let symbols = inner
301 .prepare_symbols_from_instrument_ids(&instrument_ids)
302 .map_err(to_pyvalue_err)?;
303
304 let params = RangeQueryParams {
305 dataset,
306 symbols,
307 start: start.into(),
308 end: end.map(Into::into),
309 limit,
310 price_precision,
311 };
312
313 pyo3_async_runtimes::tokio::future_into_py(py, async move {
314 let statistics = inner
315 .get_range_statistics(params)
316 .await
317 .map_err(to_pyvalue_err)?;
318 Python::with_gil(|py| statistics.into_py_any(py))
319 })
320 }
321
322 #[pyo3(name = "get_range_status")]
323 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
324 #[allow(clippy::too_many_arguments)]
325 fn py_get_range_status<'py>(
326 &self,
327 py: Python<'py>,
328 dataset: String,
329 instrument_ids: Vec<InstrumentId>,
330 start: u64,
331 end: Option<u64>,
332 limit: Option<u64>,
333 ) -> PyResult<Bound<'py, PyAny>> {
334 let inner = self.inner.clone();
335 let symbols = inner
336 .prepare_symbols_from_instrument_ids(&instrument_ids)
337 .map_err(to_pyvalue_err)?;
338
339 let params = RangeQueryParams {
340 dataset,
341 symbols,
342 start: start.into(),
343 end: end.map(Into::into),
344 limit,
345 price_precision: None,
346 };
347
348 pyo3_async_runtimes::tokio::future_into_py(py, async move {
349 let statuses = inner
350 .get_range_status(params)
351 .await
352 .map_err(to_pyvalue_err)?;
353 Python::with_gil(|py| statuses.into_py_any(py))
354 })
355 }
356}