nautilus_databento/python/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento historical client.
17
18use std::path::PathBuf;
19
20use nautilus_core::{
21    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
22    time::get_atomic_clock_realtime,
23};
24use nautilus_model::{
25    enums::BarAggregation, identifiers::InstrumentId,
26    python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29    IntoPyObjectExt,
30    exceptions::PyException,
31    prelude::*,
32    types::{PyDict, PyList},
33};
34
35use crate::historical::{
36    DatabentoHistoricalClient as CoreDatabentoHistoricalClient, RangeQueryParams,
37};
38
39/// Python wrapper for the core Databento historical client.
40#[cfg_attr(
41    feature = "python",
42    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
43)]
44#[derive(Debug)]
45pub struct DatabentoHistoricalClient {
46    #[pyo3(get)]
47    pub key: String,
48    inner: CoreDatabentoHistoricalClient,
49}
50
51#[pymethods]
52impl DatabentoHistoricalClient {
53    #[new]
54    fn py_new(
55        key: String,
56        publishers_filepath: PathBuf,
57        use_exchange_as_venue: bool,
58    ) -> PyResult<Self> {
59        let clock = get_atomic_clock_realtime();
60        let inner = CoreDatabentoHistoricalClient::new(
61            key.clone(),
62            publishers_filepath,
63            clock,
64            use_exchange_as_venue,
65        )
66        .map_err(to_pyvalue_err)?;
67
68        Ok(Self { key, inner })
69    }
70
71    #[pyo3(name = "get_dataset_range")]
72    fn py_get_dataset_range<'py>(
73        &self,
74        py: Python<'py>,
75        dataset: String,
76    ) -> PyResult<Bound<'py, PyAny>> {
77        let inner = self.inner.clone();
78
79        pyo3_async_runtimes::tokio::future_into_py(py, async move {
80            let response = inner.get_dataset_range(&dataset).await;
81            match response {
82                Ok(res) => Python::with_gil(|py| {
83                    let dict = PyDict::new(py);
84                    dict.set_item("start", res.start)?;
85                    dict.set_item("end", res.end)?;
86                    dict.into_py_any(py)
87                }),
88                Err(e) => Err(PyErr::new::<PyException, _>(format!(
89                    "Error handling response: {e}"
90                ))),
91            }
92        })
93    }
94
95    #[pyo3(name = "get_range_instruments")]
96    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
97    #[allow(clippy::too_many_arguments)]
98    fn py_get_range_instruments<'py>(
99        &self,
100        py: Python<'py>,
101        dataset: String,
102        instrument_ids: Vec<InstrumentId>,
103        start: u64,
104        end: Option<u64>,
105        limit: Option<u64>,
106    ) -> PyResult<Bound<'py, PyAny>> {
107        let inner = self.inner.clone();
108        let symbols = inner
109            .prepare_symbols_from_instrument_ids(&instrument_ids)
110            .map_err(to_pyvalue_err)?;
111
112        let params = RangeQueryParams {
113            dataset,
114            symbols,
115            start: start.into(),
116            end: end.map(Into::into),
117            limit,
118            price_precision: None,
119        };
120
121        pyo3_async_runtimes::tokio::future_into_py(py, async move {
122            let instruments = inner
123                .get_range_instruments(params)
124                .await
125                .map_err(to_pyvalue_err)?;
126
127            Python::with_gil(|py| -> PyResult<PyObject> {
128                let objs: Vec<PyObject> = instruments
129                    .into_iter()
130                    .map(|inst| instrument_any_to_pyobject(py, inst))
131                    .collect::<PyResult<Vec<PyObject>>>()?;
132
133                let list = PyList::new(py, &objs).expect("Invalid `ExactSizeIterator`");
134                Ok(list.into_py_any_unwrap(py))
135            })
136        })
137    }
138
139    #[pyo3(name = "get_range_quotes")]
140    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
141    #[allow(clippy::too_many_arguments)]
142    fn py_get_range_quotes<'py>(
143        &self,
144        py: Python<'py>,
145        dataset: String,
146        instrument_ids: Vec<InstrumentId>,
147        start: u64,
148        end: Option<u64>,
149        limit: Option<u64>,
150        price_precision: Option<u8>,
151        schema: Option<String>,
152    ) -> PyResult<Bound<'py, PyAny>> {
153        let inner = self.inner.clone();
154        let symbols = inner
155            .prepare_symbols_from_instrument_ids(&instrument_ids)
156            .map_err(to_pyvalue_err)?;
157
158        let params = RangeQueryParams {
159            dataset,
160            symbols,
161            start: start.into(),
162            end: end.map(Into::into),
163            limit,
164            price_precision,
165        };
166
167        pyo3_async_runtimes::tokio::future_into_py(py, async move {
168            let quotes = inner
169                .get_range_quotes(params, schema)
170                .await
171                .map_err(to_pyvalue_err)?;
172            Python::with_gil(|py| quotes.into_py_any(py))
173        })
174    }
175
176    #[pyo3(name = "get_range_trades")]
177    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
178    #[allow(clippy::too_many_arguments)]
179    fn py_get_range_trades<'py>(
180        &self,
181        py: Python<'py>,
182        dataset: String,
183        instrument_ids: Vec<InstrumentId>,
184        start: u64,
185        end: Option<u64>,
186        limit: Option<u64>,
187        price_precision: Option<u8>,
188    ) -> PyResult<Bound<'py, PyAny>> {
189        let inner = self.inner.clone();
190        let symbols = inner
191            .prepare_symbols_from_instrument_ids(&instrument_ids)
192            .map_err(to_pyvalue_err)?;
193
194        let params = RangeQueryParams {
195            dataset,
196            symbols,
197            start: start.into(),
198            end: end.map(Into::into),
199            limit,
200            price_precision,
201        };
202
203        pyo3_async_runtimes::tokio::future_into_py(py, async move {
204            let trades = inner
205                .get_range_trades(params)
206                .await
207                .map_err(to_pyvalue_err)?;
208            Python::with_gil(|py| trades.into_py_any(py))
209        })
210    }
211
212    #[pyo3(name = "get_range_bars")]
213    #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
214    #[allow(clippy::too_many_arguments)]
215    fn py_get_range_bars<'py>(
216        &self,
217        py: Python<'py>,
218        dataset: String,
219        instrument_ids: Vec<InstrumentId>,
220        aggregation: BarAggregation,
221        start: u64,
222        end: Option<u64>,
223        limit: Option<u64>,
224        price_precision: Option<u8>,
225        timestamp_on_close: bool,
226    ) -> PyResult<Bound<'py, PyAny>> {
227        let inner = self.inner.clone();
228        let symbols = inner
229            .prepare_symbols_from_instrument_ids(&instrument_ids)
230            .map_err(to_pyvalue_err)?;
231
232        let params = RangeQueryParams {
233            dataset,
234            symbols,
235            start: start.into(),
236            end: end.map(Into::into),
237            limit,
238            price_precision,
239        };
240
241        pyo3_async_runtimes::tokio::future_into_py(py, async move {
242            let bars = inner
243                .get_range_bars(params, aggregation, timestamp_on_close)
244                .await
245                .map_err(to_pyvalue_err)?;
246            Python::with_gil(|py| bars.into_py_any(py))
247        })
248    }
249
250    #[pyo3(name = "get_range_imbalance")]
251    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
252    #[allow(clippy::too_many_arguments)]
253    fn py_get_range_imbalance<'py>(
254        &self,
255        py: Python<'py>,
256        dataset: String,
257        instrument_ids: Vec<InstrumentId>,
258        start: u64,
259        end: Option<u64>,
260        limit: Option<u64>,
261        price_precision: Option<u8>,
262    ) -> PyResult<Bound<'py, PyAny>> {
263        let inner = self.inner.clone();
264        let symbols = inner
265            .prepare_symbols_from_instrument_ids(&instrument_ids)
266            .map_err(to_pyvalue_err)?;
267
268        let params = RangeQueryParams {
269            dataset,
270            symbols,
271            start: start.into(),
272            end: end.map(Into::into),
273            limit,
274            price_precision,
275        };
276
277        pyo3_async_runtimes::tokio::future_into_py(py, async move {
278            let imbalances = inner
279                .get_range_imbalance(params)
280                .await
281                .map_err(to_pyvalue_err)?;
282            Python::with_gil(|py| imbalances.into_py_any(py))
283        })
284    }
285
286    #[pyo3(name = "get_range_statistics")]
287    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
288    #[allow(clippy::too_many_arguments)]
289    fn py_get_range_statistics<'py>(
290        &self,
291        py: Python<'py>,
292        dataset: String,
293        instrument_ids: Vec<InstrumentId>,
294        start: u64,
295        end: Option<u64>,
296        limit: Option<u64>,
297        price_precision: Option<u8>,
298    ) -> PyResult<Bound<'py, PyAny>> {
299        let inner = self.inner.clone();
300        let symbols = inner
301            .prepare_symbols_from_instrument_ids(&instrument_ids)
302            .map_err(to_pyvalue_err)?;
303
304        let params = RangeQueryParams {
305            dataset,
306            symbols,
307            start: start.into(),
308            end: end.map(Into::into),
309            limit,
310            price_precision,
311        };
312
313        pyo3_async_runtimes::tokio::future_into_py(py, async move {
314            let statistics = inner
315                .get_range_statistics(params)
316                .await
317                .map_err(to_pyvalue_err)?;
318            Python::with_gil(|py| statistics.into_py_any(py))
319        })
320    }
321
322    #[pyo3(name = "get_range_status")]
323    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
324    #[allow(clippy::too_many_arguments)]
325    fn py_get_range_status<'py>(
326        &self,
327        py: Python<'py>,
328        dataset: String,
329        instrument_ids: Vec<InstrumentId>,
330        start: u64,
331        end: Option<u64>,
332        limit: Option<u64>,
333    ) -> PyResult<Bound<'py, PyAny>> {
334        let inner = self.inner.clone();
335        let symbols = inner
336            .prepare_symbols_from_instrument_ids(&instrument_ids)
337            .map_err(to_pyvalue_err)?;
338
339        let params = RangeQueryParams {
340            dataset,
341            symbols,
342            start: start.into(),
343            end: end.map(Into::into),
344            limit,
345            price_precision: None,
346        };
347
348        pyo3_async_runtimes::tokio::future_into_py(py, async move {
349            let statuses = inner
350                .get_range_status(params)
351                .await
352                .map_err(to_pyvalue_err)?;
353            Python::with_gil(|py| statuses.into_py_any(py))
354        })
355    }
356}