nautilus_databento/python/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, path::PathBuf};
17
18use databento::dbn;
19use nautilus_core::{
20    ffi::cvec::CVec,
21    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
22};
23use nautilus_model::{
24    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
25    identifiers::{InstrumentId, Venue},
26    python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29    prelude::*,
30    types::{PyCapsule, PyList},
31};
32use ustr::Ustr;
33
34use crate::{
35    loader::DatabentoDataLoader,
36    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
37};
38
39#[pymethods]
40impl DatabentoDataLoader {
41    #[new]
42    #[pyo3(signature = (publishers_filepath=None))]
43    fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
44        Self::new(publishers_filepath).map_err(to_pyvalue_err)
45    }
46
47    #[pyo3(name = "load_publishers")]
48    fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
49        self.load_publishers(publishers_filepath)
50            .map_err(to_pyvalue_err)
51    }
52
53    #[must_use]
54    #[pyo3(name = "get_publishers")]
55    fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
56        self.get_publishers()
57            .iter()
58            .map(|(&key, value)| (key, value.clone()))
59            .collect::<HashMap<u16, DatabentoPublisher>>()
60    }
61
62    #[pyo3(name = "set_dataset_for_venue")]
63    fn py_set_dataset_for_venue(&mut self, dataset: String, venue: Venue) {
64        self.set_dataset_for_venue(Ustr::from(&dataset), venue);
65    }
66
67    #[must_use]
68    #[pyo3(name = "get_dataset_for_venue")]
69    fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
70        self.get_dataset_for_venue(venue).map(ToString::to_string)
71    }
72
73    #[must_use]
74    #[pyo3(name = "get_venue_for_publisher")]
75    fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
76        self.get_venue_for_publisher(publisher_id)
77            .map(ToString::to_string)
78    }
79
80    #[pyo3(name = "schema_for_file")]
81    fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
82        self.schema_from_file(&filepath).map_err(to_pyvalue_err)
83    }
84
85    #[pyo3(name = "load_instruments")]
86    fn py_load_instruments(
87        &mut self,
88        py: Python,
89        filepath: PathBuf,
90        use_exchange_as_venue: bool,
91    ) -> PyResult<PyObject> {
92        let iter = self
93            .load_instruments(&filepath, use_exchange_as_venue)
94            .map_err(to_pyvalue_err)?;
95
96        let mut data = Vec::new();
97        for instrument in iter {
98            let py_object = instrument_any_to_pyobject(py, instrument)?;
99            data.push(py_object);
100        }
101
102        Ok(PyList::new(py, &data)
103            .expect("Invalid `ExactSizeIterator`")
104            .into())
105    }
106
107    // Cannot include trades
108    #[pyo3(name = "load_order_book_deltas")]
109    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
110    fn py_load_order_book_deltas(
111        &self,
112        filepath: PathBuf,
113        instrument_id: Option<InstrumentId>,
114        price_precision: Option<u8>,
115    ) -> PyResult<Vec<OrderBookDelta>> {
116        self.load_order_book_deltas(&filepath, instrument_id, price_precision)
117            .map_err(to_pyvalue_err)
118    }
119
120    #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
121    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
122    fn py_load_order_book_deltas_as_pycapsule(
123        &self,
124        py: Python,
125        filepath: PathBuf,
126        instrument_id: Option<InstrumentId>,
127        price_precision: Option<u8>,
128        include_trades: Option<bool>,
129    ) -> PyResult<PyObject> {
130        let iter = self
131            .read_records::<dbn::MboMsg>(
132                &filepath,
133                instrument_id,
134                price_precision,
135                include_trades.unwrap_or(false),
136            )
137            .map_err(to_pyvalue_err)?;
138
139        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
140    }
141
142    #[pyo3(name = "load_order_book_depth10")]
143    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
144    fn py_load_order_book_depth10(
145        &self,
146        filepath: PathBuf,
147        instrument_id: Option<InstrumentId>,
148        price_precision: Option<u8>,
149    ) -> PyResult<Vec<OrderBookDepth10>> {
150        self.load_order_book_depth10(&filepath, instrument_id, price_precision)
151            .map_err(to_pyvalue_err)
152    }
153
154    #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
155    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
156    fn py_load_order_book_depth10_as_pycapsule(
157        &self,
158        py: Python,
159        filepath: PathBuf,
160        instrument_id: Option<InstrumentId>,
161        price_precision: Option<u8>,
162    ) -> PyResult<PyObject> {
163        let iter = self
164            .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false)
165            .map_err(to_pyvalue_err)?;
166
167        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
168    }
169
170    #[pyo3(name = "load_quotes")]
171    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
172    fn py_load_quotes(
173        &self,
174        filepath: PathBuf,
175        instrument_id: Option<InstrumentId>,
176        price_precision: Option<u8>,
177    ) -> PyResult<Vec<QuoteTick>> {
178        self.load_quotes(&filepath, instrument_id, price_precision)
179            .map_err(to_pyvalue_err)
180    }
181
182    #[pyo3(name = "load_quotes_as_pycapsule")]
183    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
184    fn py_load_quotes_as_pycapsule(
185        &self,
186        py: Python,
187        filepath: PathBuf,
188        instrument_id: Option<InstrumentId>,
189        price_precision: Option<u8>,
190        include_trades: Option<bool>,
191    ) -> PyResult<PyObject> {
192        let iter = self
193            .read_records::<dbn::Mbp1Msg>(
194                &filepath,
195                instrument_id,
196                price_precision,
197                include_trades.unwrap_or(false),
198            )
199            .map_err(to_pyvalue_err)?;
200
201        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
202    }
203
204    #[pyo3(name = "load_bbo_quotes")]
205    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
206    fn py_load_bbo_quotes(
207        &self,
208        filepath: PathBuf,
209        instrument_id: Option<InstrumentId>,
210        price_precision: Option<u8>,
211    ) -> PyResult<Vec<QuoteTick>> {
212        self.load_bbo_quotes(&filepath, instrument_id, price_precision)
213            .map_err(to_pyvalue_err)
214    }
215
216    #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
217    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
218    fn py_load_bbo_quotes_as_pycapsule(
219        &self,
220        py: Python,
221        filepath: PathBuf,
222        instrument_id: Option<InstrumentId>,
223        price_precision: Option<u8>,
224    ) -> PyResult<PyObject> {
225        let iter = self
226            .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false)
227            .map_err(to_pyvalue_err)?;
228
229        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
230    }
231
232    #[pyo3(name = "load_tbbo_trades")]
233    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
234    fn py_load_tbbo_trades(
235        &self,
236        filepath: PathBuf,
237        instrument_id: Option<InstrumentId>,
238        price_precision: Option<u8>,
239    ) -> PyResult<Vec<TradeTick>> {
240        self.load_tbbo_trades(&filepath, instrument_id, price_precision)
241            .map_err(to_pyvalue_err)
242    }
243
244    #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
245    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
246    fn py_load_tbbo_trades_as_pycapsule(
247        &self,
248        py: Python,
249        filepath: PathBuf,
250        instrument_id: Option<InstrumentId>,
251        price_precision: Option<u8>,
252    ) -> PyResult<PyObject> {
253        let iter = self
254            .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false)
255            .map_err(to_pyvalue_err)?;
256
257        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
258    }
259
260    #[pyo3(name = "load_trades")]
261    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
262    fn py_load_trades(
263        &self,
264        filepath: PathBuf,
265        instrument_id: Option<InstrumentId>,
266        price_precision: Option<u8>,
267    ) -> PyResult<Vec<TradeTick>> {
268        self.load_trades(&filepath, instrument_id, price_precision)
269            .map_err(to_pyvalue_err)
270    }
271
272    #[pyo3(name = "load_trades_as_pycapsule")]
273    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
274    fn py_load_trades_as_pycapsule(
275        &self,
276        py: Python,
277        filepath: PathBuf,
278        instrument_id: Option<InstrumentId>,
279        price_precision: Option<u8>,
280    ) -> PyResult<PyObject> {
281        let iter = self
282            .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false)
283            .map_err(to_pyvalue_err)?;
284
285        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
286    }
287
288    #[pyo3(name = "load_bars")]
289    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
290    fn py_load_bars(
291        &self,
292        filepath: PathBuf,
293        instrument_id: Option<InstrumentId>,
294        price_precision: Option<u8>,
295    ) -> PyResult<Vec<Bar>> {
296        self.load_bars(&filepath, instrument_id, price_precision)
297            .map_err(to_pyvalue_err)
298    }
299
300    #[pyo3(name = "load_bars_as_pycapsule")]
301    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
302    fn py_load_bars_as_pycapsule(
303        &self,
304        py: Python,
305        filepath: PathBuf,
306        instrument_id: Option<InstrumentId>,
307        price_precision: Option<u8>,
308    ) -> PyResult<PyObject> {
309        let iter = self
310            .read_records::<dbn::OhlcvMsg>(&filepath, instrument_id, price_precision, false)
311            .map_err(to_pyvalue_err)?;
312
313        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
314    }
315
316    #[pyo3(name = "load_status")]
317    #[pyo3(signature = (filepath, instrument_id=None))]
318    fn py_load_status(
319        &self,
320        filepath: PathBuf,
321        instrument_id: Option<InstrumentId>,
322    ) -> PyResult<Vec<InstrumentStatus>> {
323        let iter = self
324            .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
325            .map_err(to_pyvalue_err)?;
326
327        let mut data = Vec::new();
328        for result in iter {
329            match result {
330                Ok(item) => data.push(item),
331                Err(e) => return Err(to_pyvalue_err(e)),
332            }
333        }
334
335        Ok(data)
336    }
337
338    #[pyo3(name = "load_imbalance")]
339    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
340    fn py_load_imbalance(
341        &self,
342        filepath: PathBuf,
343        instrument_id: Option<InstrumentId>,
344        price_precision: Option<u8>,
345    ) -> PyResult<Vec<DatabentoImbalance>> {
346        let iter = self
347            .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
348            .map_err(to_pyvalue_err)?;
349
350        let mut data = Vec::new();
351        for result in iter {
352            match result {
353                Ok(item) => data.push(item),
354                Err(e) => return Err(to_pyvalue_err(e)),
355            }
356        }
357
358        Ok(data)
359    }
360
361    #[pyo3(name = "load_statistics")]
362    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
363    fn py_load_statistics(
364        &self,
365        filepath: PathBuf,
366        instrument_id: Option<InstrumentId>,
367        price_precision: Option<u8>,
368    ) -> PyResult<Vec<DatabentoStatistics>> {
369        let iter = self
370            .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
371            .map_err(to_pyvalue_err)?;
372
373        let mut data = Vec::new();
374        for result in iter {
375            match result {
376                Ok(item) => data.push(item),
377                Err(e) => return Err(to_pyvalue_err(e)),
378            }
379        }
380
381        Ok(data)
382    }
383}
384
385fn exhaust_data_iter_to_pycapsule(
386    py: Python,
387    iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
388) -> anyhow::Result<PyObject> {
389    let mut data = Vec::new();
390    for result in iter {
391        match result {
392            Ok((Some(item1), None)) => data.push(item1),
393            Ok((None, Some(item2))) => data.push(item2),
394            Ok((Some(item1), Some(item2))) => {
395                data.push(item1);
396                data.push(item2);
397            }
398            Ok((None, None)) => {
399                continue;
400            }
401            Err(e) => return Err(e),
402        }
403    }
404
405    let cvec: CVec = data.into();
406    let capsule = PyCapsule::new::<CVec>(py, cvec, None)?;
407
408    // TODO: Improve error domain. Replace anyhow errors with nautilus
409    // errors to unify pyo3 and anyhow errors.
410    Ok(capsule.into_py_any_unwrap(py))
411}