Skip to main content

nautilus_databento/python/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento historical client.
17
18use std::path::PathBuf;
19
20use nautilus_core::{
21    python::{IntoPyObjectNautilusExt, to_pyexception, to_pyvalue_err},
22    time::get_atomic_clock_realtime,
23};
24use nautilus_model::{
25    enums::BarAggregation, identifiers::InstrumentId,
26    python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29    IntoPyObjectExt,
30    prelude::*,
31    types::{PyDict, PyList},
32};
33
34use crate::historical::{
35    DatabentoHistoricalClient as CoreDatabentoHistoricalClient, RangeQueryParams,
36};
37
38/// Python wrapper for the core Databento historical client.
39#[cfg_attr(
40    feature = "python",
41    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
42)]
43#[derive(Debug)]
44pub struct DatabentoHistoricalClient {
45    #[pyo3(get)]
46    pub key: String,
47    inner: CoreDatabentoHistoricalClient,
48}
49
50#[pymethods]
51impl DatabentoHistoricalClient {
52    #[new]
53    fn py_new(
54        key: String,
55        publishers_filepath: PathBuf,
56        use_exchange_as_venue: bool,
57    ) -> PyResult<Self> {
58        let clock = get_atomic_clock_realtime();
59        let inner = CoreDatabentoHistoricalClient::new(
60            key.clone(),
61            publishers_filepath,
62            clock,
63            use_exchange_as_venue,
64        )
65        .map_err(to_pyvalue_err)?;
66
67        Ok(Self { key, inner })
68    }
69
70    #[pyo3(name = "get_dataset_range")]
71    fn py_get_dataset_range<'py>(
72        &self,
73        py: Python<'py>,
74        dataset: String,
75    ) -> PyResult<Bound<'py, PyAny>> {
76        let inner = self.inner.clone();
77
78        pyo3_async_runtimes::tokio::future_into_py(py, async move {
79            let response = inner.get_dataset_range(&dataset).await;
80            match response {
81                Ok(res) => Python::attach(|py| {
82                    let dict = PyDict::new(py);
83                    dict.set_item("start", res.start)?;
84                    dict.set_item("end", res.end)?;
85                    dict.into_py_any(py)
86                }),
87                Err(e) => Err(to_pyexception(format!("Error handling response: {e}"))),
88            }
89        })
90    }
91
92    #[pyo3(name = "get_range_instruments")]
93    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
94    #[allow(clippy::too_many_arguments)]
95    fn py_get_range_instruments<'py>(
96        &self,
97        py: Python<'py>,
98        dataset: String,
99        instrument_ids: Vec<InstrumentId>,
100        start: u64,
101        end: Option<u64>,
102        limit: Option<u64>,
103    ) -> PyResult<Bound<'py, PyAny>> {
104        let inner = self.inner.clone();
105        let symbols = inner
106            .prepare_symbols_from_instrument_ids(&instrument_ids)
107            .map_err(to_pyvalue_err)?;
108
109        let params = RangeQueryParams {
110            dataset,
111            symbols,
112            start: start.into(),
113            end: end.map(Into::into),
114            limit,
115            price_precision: None,
116        };
117
118        pyo3_async_runtimes::tokio::future_into_py(py, async move {
119            let instruments = inner
120                .get_range_instruments(params)
121                .await
122                .map_err(to_pyvalue_err)?;
123
124            Python::attach(|py| -> PyResult<Py<PyAny>> {
125                let objs: Vec<Py<PyAny>> = instruments
126                    .into_iter()
127                    .map(|inst| instrument_any_to_pyobject(py, inst))
128                    .collect::<PyResult<Vec<Py<PyAny>>>>()?;
129
130                let list = PyList::new(py, &objs).expect("Invalid `ExactSizeIterator`");
131                Ok(list.into_py_any_unwrap(py))
132            })
133        })
134    }
135
136    #[pyo3(name = "get_range_quotes")]
137    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
138    #[allow(clippy::too_many_arguments)]
139    fn py_get_range_quotes<'py>(
140        &self,
141        py: Python<'py>,
142        dataset: String,
143        instrument_ids: Vec<InstrumentId>,
144        start: u64,
145        end: Option<u64>,
146        limit: Option<u64>,
147        price_precision: Option<u8>,
148        schema: Option<String>,
149    ) -> PyResult<Bound<'py, PyAny>> {
150        let inner = self.inner.clone();
151        let symbols = inner
152            .prepare_symbols_from_instrument_ids(&instrument_ids)
153            .map_err(to_pyvalue_err)?;
154
155        let params = RangeQueryParams {
156            dataset,
157            symbols,
158            start: start.into(),
159            end: end.map(Into::into),
160            limit,
161            price_precision,
162        };
163
164        pyo3_async_runtimes::tokio::future_into_py(py, async move {
165            let quotes = inner
166                .get_range_quotes(params, schema)
167                .await
168                .map_err(to_pyvalue_err)?;
169            Python::attach(|py| quotes.into_py_any(py))
170        })
171    }
172
173    #[pyo3(name = "get_range_trades")]
174    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
175    #[allow(clippy::too_many_arguments)]
176    fn py_get_range_trades<'py>(
177        &self,
178        py: Python<'py>,
179        dataset: String,
180        instrument_ids: Vec<InstrumentId>,
181        start: u64,
182        end: Option<u64>,
183        limit: Option<u64>,
184        price_precision: Option<u8>,
185    ) -> PyResult<Bound<'py, PyAny>> {
186        let inner = self.inner.clone();
187        let symbols = inner
188            .prepare_symbols_from_instrument_ids(&instrument_ids)
189            .map_err(to_pyvalue_err)?;
190
191        let params = RangeQueryParams {
192            dataset,
193            symbols,
194            start: start.into(),
195            end: end.map(Into::into),
196            limit,
197            price_precision,
198        };
199
200        pyo3_async_runtimes::tokio::future_into_py(py, async move {
201            let trades = inner
202                .get_range_trades(params)
203                .await
204                .map_err(to_pyvalue_err)?;
205            Python::attach(|py| trades.into_py_any(py))
206        })
207    }
208
209    #[pyo3(name = "get_range_bars")]
210    #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
211    #[allow(clippy::too_many_arguments)]
212    fn py_get_range_bars<'py>(
213        &self,
214        py: Python<'py>,
215        dataset: String,
216        instrument_ids: Vec<InstrumentId>,
217        aggregation: BarAggregation,
218        start: u64,
219        end: Option<u64>,
220        limit: Option<u64>,
221        price_precision: Option<u8>,
222        timestamp_on_close: bool,
223    ) -> PyResult<Bound<'py, PyAny>> {
224        let inner = self.inner.clone();
225        let symbols = inner
226            .prepare_symbols_from_instrument_ids(&instrument_ids)
227            .map_err(to_pyvalue_err)?;
228
229        let params = RangeQueryParams {
230            dataset,
231            symbols,
232            start: start.into(),
233            end: end.map(Into::into),
234            limit,
235            price_precision,
236        };
237
238        pyo3_async_runtimes::tokio::future_into_py(py, async move {
239            let bars = inner
240                .get_range_bars(params, aggregation, timestamp_on_close)
241                .await
242                .map_err(to_pyvalue_err)?;
243            Python::attach(|py| bars.into_py_any(py))
244        })
245    }
246
247    #[pyo3(name = "get_order_book_depth10")]
248    #[pyo3(signature = (dataset, instrument_ids, start, end=None, depth=None))]
249    #[allow(clippy::too_many_arguments)]
250    fn py_get_order_book_depth10<'py>(
251        &self,
252        py: Python<'py>,
253        dataset: String,
254        instrument_ids: Vec<InstrumentId>,
255        start: u64,
256        end: Option<u64>,
257        depth: Option<usize>,
258    ) -> PyResult<Bound<'py, PyAny>> {
259        let inner = self.inner.clone();
260        let symbols = inner
261            .prepare_symbols_from_instrument_ids(&instrument_ids)
262            .map_err(to_pyvalue_err)?;
263
264        let params = RangeQueryParams {
265            dataset,
266            symbols,
267            start: start.into(),
268            end: end.map(Into::into),
269            limit: None,
270            price_precision: None,
271        };
272
273        pyo3_async_runtimes::tokio::future_into_py(py, async move {
274            let depths = inner
275                .get_range_order_book_depth10(params, depth)
276                .await
277                .map_err(to_pyvalue_err)?;
278            Python::attach(|py| depths.into_py_any(py))
279        })
280    }
281
282    #[pyo3(name = "get_range_order_book_deltas")]
283    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
284    #[allow(clippy::too_many_arguments)]
285    fn py_get_range_order_book_deltas<'py>(
286        &self,
287        py: Python<'py>,
288        dataset: String,
289        instrument_ids: Vec<InstrumentId>,
290        start: u64,
291        end: Option<u64>,
292        limit: Option<u64>,
293        price_precision: Option<u8>,
294    ) -> PyResult<Bound<'py, PyAny>> {
295        let inner = self.inner.clone();
296        let symbols = inner
297            .prepare_symbols_from_instrument_ids(&instrument_ids)
298            .map_err(to_pyvalue_err)?;
299
300        let params = RangeQueryParams {
301            dataset,
302            symbols,
303            start: start.into(),
304            end: end.map(Into::into),
305            limit,
306            price_precision,
307        };
308
309        pyo3_async_runtimes::tokio::future_into_py(py, async move {
310            let deltas = inner
311                .get_range_order_book_deltas(params)
312                .await
313                .map_err(to_pyvalue_err)?;
314            Python::attach(|py| deltas.into_py_any(py))
315        })
316    }
317
318    #[pyo3(name = "get_range_imbalance")]
319    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
320    #[allow(clippy::too_many_arguments)]
321    fn py_get_range_imbalance<'py>(
322        &self,
323        py: Python<'py>,
324        dataset: String,
325        instrument_ids: Vec<InstrumentId>,
326        start: u64,
327        end: Option<u64>,
328        limit: Option<u64>,
329        price_precision: Option<u8>,
330    ) -> PyResult<Bound<'py, PyAny>> {
331        let inner = self.inner.clone();
332        let symbols = inner
333            .prepare_symbols_from_instrument_ids(&instrument_ids)
334            .map_err(to_pyvalue_err)?;
335
336        let params = RangeQueryParams {
337            dataset,
338            symbols,
339            start: start.into(),
340            end: end.map(Into::into),
341            limit,
342            price_precision,
343        };
344
345        pyo3_async_runtimes::tokio::future_into_py(py, async move {
346            let imbalances = inner
347                .get_range_imbalance(params)
348                .await
349                .map_err(to_pyvalue_err)?;
350            Python::attach(|py| imbalances.into_py_any(py))
351        })
352    }
353
354    #[pyo3(name = "get_range_statistics")]
355    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
356    #[allow(clippy::too_many_arguments)]
357    fn py_get_range_statistics<'py>(
358        &self,
359        py: Python<'py>,
360        dataset: String,
361        instrument_ids: Vec<InstrumentId>,
362        start: u64,
363        end: Option<u64>,
364        limit: Option<u64>,
365        price_precision: Option<u8>,
366    ) -> PyResult<Bound<'py, PyAny>> {
367        let inner = self.inner.clone();
368        let symbols = inner
369            .prepare_symbols_from_instrument_ids(&instrument_ids)
370            .map_err(to_pyvalue_err)?;
371
372        let params = RangeQueryParams {
373            dataset,
374            symbols,
375            start: start.into(),
376            end: end.map(Into::into),
377            limit,
378            price_precision,
379        };
380
381        pyo3_async_runtimes::tokio::future_into_py(py, async move {
382            let statistics = inner
383                .get_range_statistics(params)
384                .await
385                .map_err(to_pyvalue_err)?;
386            Python::attach(|py| statistics.into_py_any(py))
387        })
388    }
389
390    #[pyo3(name = "get_range_status")]
391    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
392    #[allow(clippy::too_many_arguments)]
393    fn py_get_range_status<'py>(
394        &self,
395        py: Python<'py>,
396        dataset: String,
397        instrument_ids: Vec<InstrumentId>,
398        start: u64,
399        end: Option<u64>,
400        limit: Option<u64>,
401    ) -> PyResult<Bound<'py, PyAny>> {
402        let inner = self.inner.clone();
403        let symbols = inner
404            .prepare_symbols_from_instrument_ids(&instrument_ids)
405            .map_err(to_pyvalue_err)?;
406
407        let params = RangeQueryParams {
408            dataset,
409            symbols,
410            start: start.into(),
411            end: end.map(Into::into),
412            limit,
413            price_precision: None,
414        };
415
416        pyo3_async_runtimes::tokio::future_into_py(py, async move {
417            let statuses = inner
418                .get_range_status(params)
419                .await
420                .map_err(to_pyvalue_err)?;
421            Python::attach(|py| statuses.into_py_any(py))
422        })
423    }
424}