nautilus_databento/python/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, path::PathBuf};
17
18use databento::dbn;
19use nautilus_core::{
20    ffi::cvec::CVec,
21    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
22};
23use nautilus_model::{
24    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
25    identifiers::{InstrumentId, Venue},
26    python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29    prelude::*,
30    types::{PyCapsule, PyList},
31};
32
33use crate::{
34    loader::DatabentoDataLoader,
35    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
36};
37
38#[pymethods]
39impl DatabentoDataLoader {
40    #[new]
41    #[pyo3(signature = (publishers_filepath=None))]
42    fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
43        Self::new(publishers_filepath).map_err(to_pyvalue_err)
44    }
45
46    #[pyo3(name = "load_publishers")]
47    fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
48        self.load_publishers(publishers_filepath)
49            .map_err(to_pyvalue_err)
50    }
51
52    #[must_use]
53    #[pyo3(name = "get_publishers")]
54    fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
55        self.get_publishers()
56            .iter()
57            .map(|(&key, value)| (key, value.clone()))
58            .collect::<HashMap<u16, DatabentoPublisher>>()
59    }
60
61    #[must_use]
62    #[pyo3(name = "get_dataset_for_venue")]
63    fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
64        self.get_dataset_for_venue(venue)
65            .map(std::string::ToString::to_string)
66    }
67
68    #[must_use]
69    #[pyo3(name = "get_venue_for_publisher")]
70    fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
71        self.get_venue_for_publisher(publisher_id)
72            .map(std::string::ToString::to_string)
73    }
74
75    #[pyo3(name = "schema_for_file")]
76    fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
77        self.schema_from_file(&filepath).map_err(to_pyvalue_err)
78    }
79
80    #[pyo3(name = "load_instruments")]
81    fn py_load_instruments(
82        &mut self,
83        py: Python,
84        filepath: PathBuf,
85        use_exchange_as_venue: bool,
86    ) -> PyResult<PyObject> {
87        let iter = self
88            .load_instruments(&filepath, use_exchange_as_venue)
89            .map_err(to_pyvalue_err)?;
90
91        let mut data = Vec::new();
92        for instrument in iter {
93            let py_object = instrument_any_to_pyobject(py, instrument)?;
94            data.push(py_object);
95        }
96
97        Ok(PyList::new(py, &data)
98            .expect("Invalid `ExactSizeIterator`")
99            .into())
100    }
101
102    // Cannot include trades
103    #[pyo3(name = "load_order_book_deltas")]
104    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
105    fn py_load_order_book_deltas(
106        &self,
107        filepath: PathBuf,
108        instrument_id: Option<InstrumentId>,
109        price_precision: Option<u8>,
110    ) -> PyResult<Vec<OrderBookDelta>> {
111        self.load_order_book_deltas(&filepath, instrument_id, price_precision)
112            .map_err(to_pyvalue_err)
113    }
114
115    #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
116    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
117    fn py_load_order_book_deltas_as_pycapsule(
118        &self,
119        py: Python,
120        filepath: PathBuf,
121        instrument_id: Option<InstrumentId>,
122        price_precision: Option<u8>,
123        include_trades: Option<bool>,
124    ) -> PyResult<PyObject> {
125        let iter = self
126            .read_records::<dbn::MboMsg>(
127                &filepath,
128                instrument_id,
129                price_precision,
130                include_trades.unwrap_or(false),
131            )
132            .map_err(to_pyvalue_err)?;
133
134        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
135    }
136
137    #[pyo3(name = "load_order_book_depth10")]
138    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
139    fn py_load_order_book_depth10(
140        &self,
141        filepath: PathBuf,
142        instrument_id: Option<InstrumentId>,
143        price_precision: Option<u8>,
144    ) -> PyResult<Vec<OrderBookDepth10>> {
145        self.load_order_book_depth10(&filepath, instrument_id, price_precision)
146            .map_err(to_pyvalue_err)
147    }
148
149    #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
150    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
151    fn py_load_order_book_depth10_as_pycapsule(
152        &self,
153        py: Python,
154        filepath: PathBuf,
155        instrument_id: Option<InstrumentId>,
156        price_precision: Option<u8>,
157    ) -> PyResult<PyObject> {
158        let iter = self
159            .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false)
160            .map_err(to_pyvalue_err)?;
161
162        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
163    }
164
165    #[pyo3(name = "load_quotes")]
166    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
167    fn py_load_quotes(
168        &self,
169        filepath: PathBuf,
170        instrument_id: Option<InstrumentId>,
171        price_precision: Option<u8>,
172    ) -> PyResult<Vec<QuoteTick>> {
173        self.load_quotes(&filepath, instrument_id, price_precision)
174            .map_err(to_pyvalue_err)
175    }
176
177    #[pyo3(name = "load_quotes_as_pycapsule")]
178    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
179    fn py_load_quotes_as_pycapsule(
180        &self,
181        py: Python,
182        filepath: PathBuf,
183        instrument_id: Option<InstrumentId>,
184        price_precision: Option<u8>,
185        include_trades: Option<bool>,
186    ) -> PyResult<PyObject> {
187        let iter = self
188            .read_records::<dbn::Mbp1Msg>(
189                &filepath,
190                instrument_id,
191                price_precision,
192                include_trades.unwrap_or(false),
193            )
194            .map_err(to_pyvalue_err)?;
195
196        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
197    }
198
199    #[pyo3(name = "load_bbo_quotes")]
200    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
201    fn py_load_bbo_quotes(
202        &self,
203        filepath: PathBuf,
204        instrument_id: Option<InstrumentId>,
205        price_precision: Option<u8>,
206    ) -> PyResult<Vec<QuoteTick>> {
207        self.load_bbo_quotes(&filepath, instrument_id, price_precision)
208            .map_err(to_pyvalue_err)
209    }
210
211    #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
212    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
213    fn py_load_bbo_quotes_as_pycapsule(
214        &self,
215        py: Python,
216        filepath: PathBuf,
217        instrument_id: Option<InstrumentId>,
218        price_precision: Option<u8>,
219    ) -> PyResult<PyObject> {
220        let iter = self
221            .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false)
222            .map_err(to_pyvalue_err)?;
223
224        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
225    }
226
227    #[pyo3(name = "load_tbbo_trades")]
228    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
229    fn py_load_tbbo_trades(
230        &self,
231        filepath: PathBuf,
232        instrument_id: Option<InstrumentId>,
233        price_precision: Option<u8>,
234    ) -> PyResult<Vec<TradeTick>> {
235        self.load_tbbo_trades(&filepath, instrument_id, price_precision)
236            .map_err(to_pyvalue_err)
237    }
238
239    #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
240    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
241    fn py_load_tbbo_trades_as_pycapsule(
242        &self,
243        py: Python,
244        filepath: PathBuf,
245        instrument_id: Option<InstrumentId>,
246        price_precision: Option<u8>,
247    ) -> PyResult<PyObject> {
248        let iter = self
249            .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false)
250            .map_err(to_pyvalue_err)?;
251
252        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
253    }
254
255    #[pyo3(name = "load_trades")]
256    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
257    fn py_load_trades(
258        &self,
259        filepath: PathBuf,
260        instrument_id: Option<InstrumentId>,
261        price_precision: Option<u8>,
262    ) -> PyResult<Vec<TradeTick>> {
263        self.load_trades(&filepath, instrument_id, price_precision)
264            .map_err(to_pyvalue_err)
265    }
266
267    #[pyo3(name = "load_trades_as_pycapsule")]
268    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
269    fn py_load_trades_as_pycapsule(
270        &self,
271        py: Python,
272        filepath: PathBuf,
273        instrument_id: Option<InstrumentId>,
274        price_precision: Option<u8>,
275    ) -> PyResult<PyObject> {
276        let iter = self
277            .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false)
278            .map_err(to_pyvalue_err)?;
279
280        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
281    }
282
283    #[pyo3(name = "load_bars")]
284    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
285    fn py_load_bars(
286        &self,
287        filepath: PathBuf,
288        instrument_id: Option<InstrumentId>,
289        price_precision: Option<u8>,
290    ) -> PyResult<Vec<Bar>> {
291        self.load_bars(&filepath, instrument_id, price_precision)
292            .map_err(to_pyvalue_err)
293    }
294
295    #[pyo3(name = "load_bars_as_pycapsule")]
296    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
297    fn py_load_bars_as_pycapsule(
298        &self,
299        py: Python,
300        filepath: PathBuf,
301        instrument_id: Option<InstrumentId>,
302        price_precision: Option<u8>,
303    ) -> PyResult<PyObject> {
304        let iter = self
305            .read_records::<dbn::OhlcvMsg>(&filepath, instrument_id, price_precision, false)
306            .map_err(to_pyvalue_err)?;
307
308        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
309    }
310
311    #[pyo3(name = "load_status")]
312    #[pyo3(signature = (filepath, instrument_id=None))]
313    fn py_load_status(
314        &self,
315        filepath: PathBuf,
316        instrument_id: Option<InstrumentId>,
317    ) -> PyResult<Vec<InstrumentStatus>> {
318        let iter = self
319            .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
320            .map_err(to_pyvalue_err)?;
321
322        let mut data = Vec::new();
323        for result in iter {
324            match result {
325                Ok(item) => data.push(item),
326                Err(e) => return Err(to_pyvalue_err(e)),
327            }
328        }
329
330        Ok(data)
331    }
332
333    #[pyo3(name = "load_imbalance")]
334    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
335    fn py_load_imbalance(
336        &self,
337        filepath: PathBuf,
338        instrument_id: Option<InstrumentId>,
339        price_precision: Option<u8>,
340    ) -> PyResult<Vec<DatabentoImbalance>> {
341        let iter = self
342            .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
343            .map_err(to_pyvalue_err)?;
344
345        let mut data = Vec::new();
346        for result in iter {
347            match result {
348                Ok(item) => data.push(item),
349                Err(e) => return Err(to_pyvalue_err(e)),
350            }
351        }
352
353        Ok(data)
354    }
355
356    #[pyo3(name = "load_statistics")]
357    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
358    fn py_load_statistics(
359        &self,
360        filepath: PathBuf,
361        instrument_id: Option<InstrumentId>,
362        price_precision: Option<u8>,
363    ) -> PyResult<Vec<DatabentoStatistics>> {
364        let iter = self
365            .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
366            .map_err(to_pyvalue_err)?;
367
368        let mut data = Vec::new();
369        for result in iter {
370            match result {
371                Ok(item) => data.push(item),
372                Err(e) => return Err(to_pyvalue_err(e)),
373            }
374        }
375
376        Ok(data)
377    }
378}
379
380fn exhaust_data_iter_to_pycapsule(
381    py: Python,
382    iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
383) -> anyhow::Result<PyObject> {
384    let mut data = Vec::new();
385    for result in iter {
386        match result {
387            Ok((Some(item1), None)) => data.push(item1),
388            Ok((None, Some(item2))) => data.push(item2),
389            Ok((Some(item1), Some(item2))) => {
390                data.push(item1);
391                data.push(item2);
392            }
393            Ok((None, None)) => {
394                continue;
395            }
396            Err(e) => return Err(e),
397        }
398    }
399
400    let cvec: CVec = data.into();
401    let capsule = PyCapsule::new::<CVec>(py, cvec, None)?;
402
403    // TODO: Improve error domain. Replace anyhow errors with nautilus
404    // errors to unify pyo3 and anyhow errors.
405    Ok(capsule.into_py_any_unwrap(py))
406}