nautilus_databento/python/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento data loader.
17
18use std::{collections::HashMap, path::PathBuf};
19
20use databento::dbn;
21use nautilus_core::{
22    ffi::cvec::CVec,
23    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
24};
25use nautilus_model::{
26    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
27    identifiers::{InstrumentId, Venue},
28    python::instruments::instrument_any_to_pyobject,
29};
30use pyo3::{
31    prelude::*,
32    types::{PyCapsule, PyList},
33};
34use ustr::Ustr;
35
36use crate::{
37    loader::DatabentoDataLoader,
38    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
39};
40
41#[pymethods]
42impl DatabentoDataLoader {
43    #[new]
44    #[pyo3(signature = (publishers_filepath=None))]
45    fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
46        Self::new(publishers_filepath).map_err(to_pyvalue_err)
47    }
48
49    #[pyo3(name = "load_publishers")]
50    fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
51        self.load_publishers(publishers_filepath)
52            .map_err(to_pyvalue_err)
53    }
54
55    #[must_use]
56    #[pyo3(name = "get_publishers")]
57    fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
58        self.get_publishers()
59            .iter()
60            .map(|(&key, value)| (key, value.clone()))
61            .collect::<HashMap<u16, DatabentoPublisher>>()
62    }
63
64    #[pyo3(name = "set_dataset_for_venue")]
65    fn py_set_dataset_for_venue(&mut self, dataset: String, venue: Venue) {
66        self.set_dataset_for_venue(Ustr::from(&dataset), venue);
67    }
68
69    #[must_use]
70    #[pyo3(name = "get_dataset_for_venue")]
71    fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
72        self.get_dataset_for_venue(venue).map(ToString::to_string)
73    }
74
75    #[must_use]
76    #[pyo3(name = "get_venue_for_publisher")]
77    fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
78        self.get_venue_for_publisher(publisher_id)
79            .map(ToString::to_string)
80    }
81
82    #[pyo3(name = "schema_for_file")]
83    fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
84        self.schema_from_file(&filepath).map_err(to_pyvalue_err)
85    }
86
87    #[pyo3(name = "load_instruments")]
88    fn py_load_instruments(
89        &mut self,
90        py: Python,
91        filepath: PathBuf,
92        use_exchange_as_venue: bool,
93    ) -> PyResult<Py<PyAny>> {
94        let iter = self
95            .load_instruments(&filepath, use_exchange_as_venue)
96            .map_err(to_pyvalue_err)?;
97
98        let mut data = Vec::new();
99        for instrument in iter {
100            let py_object = instrument_any_to_pyobject(py, instrument)?;
101            data.push(py_object);
102        }
103
104        let list = PyList::new(py, &data).expect("Invalid `ExactSizeIterator`");
105
106        Ok(list.into_py_any_unwrap(py))
107    }
108
109    // Cannot include trades
110    #[pyo3(name = "load_order_book_deltas")]
111    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
112    fn py_load_order_book_deltas(
113        &self,
114        filepath: PathBuf,
115        instrument_id: Option<InstrumentId>,
116        price_precision: Option<u8>,
117    ) -> PyResult<Vec<OrderBookDelta>> {
118        self.load_order_book_deltas(&filepath, instrument_id, price_precision)
119            .map_err(to_pyvalue_err)
120    }
121
122    #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
123    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
124    fn py_load_order_book_deltas_as_pycapsule(
125        &self,
126        py: Python,
127        filepath: PathBuf,
128        instrument_id: Option<InstrumentId>,
129        price_precision: Option<u8>,
130        include_trades: Option<bool>,
131    ) -> PyResult<Py<PyAny>> {
132        let iter = self
133            .read_records::<dbn::MboMsg>(
134                &filepath,
135                instrument_id,
136                price_precision,
137                include_trades.unwrap_or(false),
138                None,
139            )
140            .map_err(to_pyvalue_err)?;
141
142        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
143    }
144
145    #[pyo3(name = "load_order_book_depth10")]
146    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
147    fn py_load_order_book_depth10(
148        &self,
149        filepath: PathBuf,
150        instrument_id: Option<InstrumentId>,
151        price_precision: Option<u8>,
152    ) -> PyResult<Vec<OrderBookDepth10>> {
153        self.load_order_book_depth10(&filepath, instrument_id, price_precision)
154            .map_err(to_pyvalue_err)
155    }
156
157    #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
158    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
159    fn py_load_order_book_depth10_as_pycapsule(
160        &self,
161        py: Python,
162        filepath: PathBuf,
163        instrument_id: Option<InstrumentId>,
164        price_precision: Option<u8>,
165    ) -> PyResult<Py<PyAny>> {
166        let iter = self
167            .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false, None)
168            .map_err(to_pyvalue_err)?;
169
170        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
171    }
172
173    #[pyo3(name = "load_quotes")]
174    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
175    fn py_load_quotes(
176        &self,
177        filepath: PathBuf,
178        instrument_id: Option<InstrumentId>,
179        price_precision: Option<u8>,
180    ) -> PyResult<Vec<QuoteTick>> {
181        self.load_quotes(&filepath, instrument_id, price_precision)
182            .map_err(to_pyvalue_err)
183    }
184
185    #[pyo3(name = "load_quotes_as_pycapsule")]
186    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
187    fn py_load_quotes_as_pycapsule(
188        &self,
189        py: Python,
190        filepath: PathBuf,
191        instrument_id: Option<InstrumentId>,
192        price_precision: Option<u8>,
193        include_trades: Option<bool>,
194    ) -> PyResult<Py<PyAny>> {
195        let iter = self
196            .read_records::<dbn::Mbp1Msg>(
197                &filepath,
198                instrument_id,
199                price_precision,
200                include_trades.unwrap_or(false),
201                None,
202            )
203            .map_err(to_pyvalue_err)?;
204
205        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
206    }
207
208    #[pyo3(name = "load_bbo_quotes")]
209    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
210    fn py_load_bbo_quotes(
211        &self,
212        filepath: PathBuf,
213        instrument_id: Option<InstrumentId>,
214        price_precision: Option<u8>,
215    ) -> PyResult<Vec<QuoteTick>> {
216        self.load_bbo_quotes(&filepath, instrument_id, price_precision)
217            .map_err(to_pyvalue_err)
218    }
219
220    #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
221    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
222    fn py_load_bbo_quotes_as_pycapsule(
223        &self,
224        py: Python,
225        filepath: PathBuf,
226        instrument_id: Option<InstrumentId>,
227        price_precision: Option<u8>,
228    ) -> PyResult<Py<PyAny>> {
229        let iter = self
230            .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false, None)
231            .map_err(to_pyvalue_err)?;
232
233        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
234    }
235
236    #[pyo3(name = "load_cmbp_quotes")]
237    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
238    fn py_load_cmbp_quotes(
239        &self,
240        filepath: PathBuf,
241        instrument_id: Option<InstrumentId>,
242        price_precision: Option<u8>,
243    ) -> PyResult<Vec<QuoteTick>> {
244        self.load_cmbp_quotes(&filepath, instrument_id, price_precision)
245            .map_err(to_pyvalue_err)
246    }
247
248    #[pyo3(name = "load_cmbp_quotes_as_pycapsule")]
249    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
250    fn py_load_cmbp_quotes_as_pycapsule(
251        &self,
252        py: Python,
253        filepath: PathBuf,
254        instrument_id: Option<InstrumentId>,
255        price_precision: Option<u8>,
256        include_trades: Option<bool>,
257    ) -> PyResult<Py<PyAny>> {
258        let iter = self
259            .read_records::<dbn::Cmbp1Msg>(
260                &filepath,
261                instrument_id,
262                price_precision,
263                include_trades.unwrap_or(false),
264                None,
265            )
266            .map_err(to_pyvalue_err)?;
267
268        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
269    }
270
271    #[pyo3(name = "load_cbbo_quotes")]
272    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
273    fn py_load_cbbo_quotes(
274        &self,
275        filepath: PathBuf,
276        instrument_id: Option<InstrumentId>,
277        price_precision: Option<u8>,
278    ) -> PyResult<Vec<QuoteTick>> {
279        self.load_cbbo_quotes(&filepath, instrument_id, price_precision)
280            .map_err(to_pyvalue_err)
281    }
282
283    #[pyo3(name = "load_cbbo_quotes_as_pycapsule")]
284    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
285    fn py_load_cbbo_quotes_as_pycapsule(
286        &self,
287        py: Python,
288        filepath: PathBuf,
289        instrument_id: Option<InstrumentId>,
290        price_precision: Option<u8>,
291    ) -> PyResult<Py<PyAny>> {
292        let iter = self
293            .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
294            .map_err(to_pyvalue_err)?;
295
296        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
297    }
298
299    #[pyo3(name = "load_tbbo_trades")]
300    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
301    fn py_load_tbbo_trades(
302        &self,
303        filepath: PathBuf,
304        instrument_id: Option<InstrumentId>,
305        price_precision: Option<u8>,
306    ) -> PyResult<Vec<TradeTick>> {
307        self.load_tbbo_trades(&filepath, instrument_id, price_precision)
308            .map_err(to_pyvalue_err)
309    }
310
311    #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
312    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
313    fn py_load_tbbo_trades_as_pycapsule(
314        &self,
315        py: Python,
316        filepath: PathBuf,
317        instrument_id: Option<InstrumentId>,
318        price_precision: Option<u8>,
319    ) -> PyResult<Py<PyAny>> {
320        let iter = self
321            .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false, None)
322            .map_err(to_pyvalue_err)?;
323
324        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
325    }
326
327    #[pyo3(name = "load_tcbbo_trades")]
328    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
329    fn py_load_tcbbo_trades(
330        &self,
331        filepath: PathBuf,
332        instrument_id: Option<InstrumentId>,
333        price_precision: Option<u8>,
334    ) -> PyResult<Vec<TradeTick>> {
335        self.load_tcbbo_trades(&filepath, instrument_id, price_precision)
336            .map_err(to_pyvalue_err)
337    }
338
339    #[pyo3(name = "load_tcbbo_trades_as_pycapsule")]
340    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
341    fn py_load_tcbbo_trades_as_pycapsule(
342        &self,
343        py: Python,
344        filepath: PathBuf,
345        instrument_id: Option<InstrumentId>,
346        price_precision: Option<u8>,
347    ) -> PyResult<Py<PyAny>> {
348        let iter = self
349            .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
350            .map_err(to_pyvalue_err)?;
351
352        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
353    }
354
355    #[pyo3(name = "load_trades")]
356    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
357    fn py_load_trades(
358        &self,
359        filepath: PathBuf,
360        instrument_id: Option<InstrumentId>,
361        price_precision: Option<u8>,
362    ) -> PyResult<Vec<TradeTick>> {
363        self.load_trades(&filepath, instrument_id, price_precision)
364            .map_err(to_pyvalue_err)
365    }
366
367    #[pyo3(name = "load_trades_as_pycapsule")]
368    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
369    fn py_load_trades_as_pycapsule(
370        &self,
371        py: Python,
372        filepath: PathBuf,
373        instrument_id: Option<InstrumentId>,
374        price_precision: Option<u8>,
375    ) -> PyResult<Py<PyAny>> {
376        let iter = self
377            .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false, None)
378            .map_err(to_pyvalue_err)?;
379
380        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
381    }
382
383    #[pyo3(name = "load_bars")]
384    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
385    fn py_load_bars(
386        &self,
387        filepath: PathBuf,
388        instrument_id: Option<InstrumentId>,
389        price_precision: Option<u8>,
390        timestamp_on_close: bool,
391    ) -> PyResult<Vec<Bar>> {
392        self.load_bars(
393            &filepath,
394            instrument_id,
395            price_precision,
396            Some(timestamp_on_close),
397        )
398        .map_err(to_pyvalue_err)
399    }
400
401    #[pyo3(name = "load_bars_as_pycapsule")]
402    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
403    fn py_load_bars_as_pycapsule(
404        &self,
405        py: Python,
406        filepath: PathBuf,
407        instrument_id: Option<InstrumentId>,
408        price_precision: Option<u8>,
409        timestamp_on_close: bool,
410    ) -> PyResult<Py<PyAny>> {
411        let iter = self
412            .read_records::<dbn::OhlcvMsg>(
413                &filepath,
414                instrument_id,
415                price_precision,
416                false,
417                Some(timestamp_on_close),
418            )
419            .map_err(to_pyvalue_err)?;
420
421        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
422    }
423
424    #[pyo3(name = "load_status")]
425    #[pyo3(signature = (filepath, instrument_id=None))]
426    fn py_load_status(
427        &self,
428        filepath: PathBuf,
429        instrument_id: Option<InstrumentId>,
430    ) -> PyResult<Vec<InstrumentStatus>> {
431        let iter = self
432            .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
433            .map_err(to_pyvalue_err)?;
434
435        let mut data = Vec::new();
436        for result in iter {
437            match result {
438                Ok(item) => data.push(item),
439                Err(e) => return Err(to_pyvalue_err(e)),
440            }
441        }
442
443        Ok(data)
444    }
445
446    #[pyo3(name = "load_imbalance")]
447    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
448    fn py_load_imbalance(
449        &self,
450        filepath: PathBuf,
451        instrument_id: Option<InstrumentId>,
452        price_precision: Option<u8>,
453    ) -> PyResult<Vec<DatabentoImbalance>> {
454        let iter = self
455            .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
456            .map_err(to_pyvalue_err)?;
457
458        let mut data = Vec::new();
459        for result in iter {
460            match result {
461                Ok(item) => data.push(item),
462                Err(e) => return Err(to_pyvalue_err(e)),
463            }
464        }
465
466        Ok(data)
467    }
468
469    #[pyo3(name = "load_statistics")]
470    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
471    fn py_load_statistics(
472        &self,
473        filepath: PathBuf,
474        instrument_id: Option<InstrumentId>,
475        price_precision: Option<u8>,
476    ) -> PyResult<Vec<DatabentoStatistics>> {
477        let iter = self
478            .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
479            .map_err(to_pyvalue_err)?;
480
481        let mut data = Vec::new();
482        for result in iter {
483            match result {
484                Ok(item) => data.push(item),
485                Err(e) => return Err(to_pyvalue_err(e)),
486            }
487        }
488
489        Ok(data)
490    }
491}
492
493fn exhaust_data_iter_to_pycapsule(
494    py: Python,
495    iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
496) -> anyhow::Result<Py<PyAny>> {
497    let mut data = Vec::new();
498    for result in iter {
499        match result {
500            Ok((Some(item1), None)) => data.push(item1),
501            Ok((None, Some(item2))) => data.push(item2),
502            Ok((Some(item1), Some(item2))) => {
503                data.push(item1);
504                data.push(item2);
505            }
506            Ok((None, None)) => {
507                continue;
508            }
509            Err(e) => return Err(e),
510        }
511    }
512
513    let cvec: CVec = data.into();
514    let capsule = PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {})?;
515
516    // TODO: Improve error domain. Replace anyhow errors with nautilus
517    // errors to unify pyo3 and anyhow errors.
518    Ok(capsule.into_py_any_unwrap(py))
519}