1use std::{collections::HashMap, path::PathBuf};
17
18use databento::dbn;
19use nautilus_core::{
20 ffi::cvec::CVec,
21 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
22};
23use nautilus_model::{
24 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
25 identifiers::{InstrumentId, Venue},
26 python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29 prelude::*,
30 types::{PyCapsule, PyList},
31};
32
33use crate::{
34 loader::DatabentoDataLoader,
35 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
36};
37
38#[pymethods]
39impl DatabentoDataLoader {
40 #[new]
41 #[pyo3(signature = (publishers_filepath=None))]
42 fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
43 Self::new(publishers_filepath).map_err(to_pyvalue_err)
44 }
45
46 #[pyo3(name = "load_publishers")]
47 fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
48 self.load_publishers(publishers_filepath)
49 .map_err(to_pyvalue_err)
50 }
51
52 #[must_use]
53 #[pyo3(name = "get_publishers")]
54 fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
55 self.get_publishers()
56 .iter()
57 .map(|(&key, value)| (key, value.clone()))
58 .collect::<HashMap<u16, DatabentoPublisher>>()
59 }
60
61 #[must_use]
62 #[pyo3(name = "get_dataset_for_venue")]
63 fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
64 self.get_dataset_for_venue(venue)
65 .map(std::string::ToString::to_string)
66 }
67
68 #[must_use]
69 #[pyo3(name = "get_venue_for_publisher")]
70 fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
71 self.get_venue_for_publisher(publisher_id)
72 .map(std::string::ToString::to_string)
73 }
74
75 #[pyo3(name = "schema_for_file")]
76 fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
77 self.schema_from_file(&filepath).map_err(to_pyvalue_err)
78 }
79
80 #[pyo3(name = "load_instruments")]
81 fn py_load_instruments(
82 &mut self,
83 py: Python,
84 filepath: PathBuf,
85 use_exchange_as_venue: bool,
86 ) -> PyResult<PyObject> {
87 let iter = self
88 .load_instruments(&filepath, use_exchange_as_venue)
89 .map_err(to_pyvalue_err)?;
90
91 let mut data = Vec::new();
92 for instrument in iter {
93 let py_object = instrument_any_to_pyobject(py, instrument)?;
94 data.push(py_object);
95 }
96
97 Ok(PyList::new(py, &data)
98 .expect("Invalid `ExactSizeIterator`")
99 .into())
100 }
101
102 #[pyo3(name = "load_order_book_deltas")]
104 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
105 fn py_load_order_book_deltas(
106 &self,
107 filepath: PathBuf,
108 instrument_id: Option<InstrumentId>,
109 price_precision: Option<u8>,
110 ) -> PyResult<Vec<OrderBookDelta>> {
111 self.load_order_book_deltas(&filepath, instrument_id, price_precision)
112 .map_err(to_pyvalue_err)
113 }
114
115 #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
116 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
117 fn py_load_order_book_deltas_as_pycapsule(
118 &self,
119 py: Python,
120 filepath: PathBuf,
121 instrument_id: Option<InstrumentId>,
122 price_precision: Option<u8>,
123 include_trades: Option<bool>,
124 ) -> PyResult<PyObject> {
125 let iter = self
126 .read_records::<dbn::MboMsg>(
127 &filepath,
128 instrument_id,
129 price_precision,
130 include_trades.unwrap_or(false),
131 )
132 .map_err(to_pyvalue_err)?;
133
134 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
135 }
136
137 #[pyo3(name = "load_order_book_depth10")]
138 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
139 fn py_load_order_book_depth10(
140 &self,
141 filepath: PathBuf,
142 instrument_id: Option<InstrumentId>,
143 price_precision: Option<u8>,
144 ) -> PyResult<Vec<OrderBookDepth10>> {
145 self.load_order_book_depth10(&filepath, instrument_id, price_precision)
146 .map_err(to_pyvalue_err)
147 }
148
149 #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
150 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
151 fn py_load_order_book_depth10_as_pycapsule(
152 &self,
153 py: Python,
154 filepath: PathBuf,
155 instrument_id: Option<InstrumentId>,
156 price_precision: Option<u8>,
157 ) -> PyResult<PyObject> {
158 let iter = self
159 .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false)
160 .map_err(to_pyvalue_err)?;
161
162 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
163 }
164
165 #[pyo3(name = "load_quotes")]
166 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
167 fn py_load_quotes(
168 &self,
169 filepath: PathBuf,
170 instrument_id: Option<InstrumentId>,
171 price_precision: Option<u8>,
172 ) -> PyResult<Vec<QuoteTick>> {
173 self.load_quotes(&filepath, instrument_id, price_precision)
174 .map_err(to_pyvalue_err)
175 }
176
177 #[pyo3(name = "load_quotes_as_pycapsule")]
178 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
179 fn py_load_quotes_as_pycapsule(
180 &self,
181 py: Python,
182 filepath: PathBuf,
183 instrument_id: Option<InstrumentId>,
184 price_precision: Option<u8>,
185 include_trades: Option<bool>,
186 ) -> PyResult<PyObject> {
187 let iter = self
188 .read_records::<dbn::Mbp1Msg>(
189 &filepath,
190 instrument_id,
191 price_precision,
192 include_trades.unwrap_or(false),
193 )
194 .map_err(to_pyvalue_err)?;
195
196 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
197 }
198
199 #[pyo3(name = "load_bbo_quotes")]
200 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
201 fn py_load_bbo_quotes(
202 &self,
203 filepath: PathBuf,
204 instrument_id: Option<InstrumentId>,
205 price_precision: Option<u8>,
206 ) -> PyResult<Vec<QuoteTick>> {
207 self.load_bbo_quotes(&filepath, instrument_id, price_precision)
208 .map_err(to_pyvalue_err)
209 }
210
211 #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
212 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
213 fn py_load_bbo_quotes_as_pycapsule(
214 &self,
215 py: Python,
216 filepath: PathBuf,
217 instrument_id: Option<InstrumentId>,
218 price_precision: Option<u8>,
219 ) -> PyResult<PyObject> {
220 let iter = self
221 .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false)
222 .map_err(to_pyvalue_err)?;
223
224 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
225 }
226
227 #[pyo3(name = "load_tbbo_trades")]
228 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
229 fn py_load_tbbo_trades(
230 &self,
231 filepath: PathBuf,
232 instrument_id: Option<InstrumentId>,
233 price_precision: Option<u8>,
234 ) -> PyResult<Vec<TradeTick>> {
235 self.load_tbbo_trades(&filepath, instrument_id, price_precision)
236 .map_err(to_pyvalue_err)
237 }
238
239 #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
240 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
241 fn py_load_tbbo_trades_as_pycapsule(
242 &self,
243 py: Python,
244 filepath: PathBuf,
245 instrument_id: Option<InstrumentId>,
246 price_precision: Option<u8>,
247 ) -> PyResult<PyObject> {
248 let iter = self
249 .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false)
250 .map_err(to_pyvalue_err)?;
251
252 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
253 }
254
255 #[pyo3(name = "load_trades")]
256 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
257 fn py_load_trades(
258 &self,
259 filepath: PathBuf,
260 instrument_id: Option<InstrumentId>,
261 price_precision: Option<u8>,
262 ) -> PyResult<Vec<TradeTick>> {
263 self.load_trades(&filepath, instrument_id, price_precision)
264 .map_err(to_pyvalue_err)
265 }
266
267 #[pyo3(name = "load_trades_as_pycapsule")]
268 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
269 fn py_load_trades_as_pycapsule(
270 &self,
271 py: Python,
272 filepath: PathBuf,
273 instrument_id: Option<InstrumentId>,
274 price_precision: Option<u8>,
275 ) -> PyResult<PyObject> {
276 let iter = self
277 .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false)
278 .map_err(to_pyvalue_err)?;
279
280 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
281 }
282
283 #[pyo3(name = "load_bars")]
284 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
285 fn py_load_bars(
286 &self,
287 filepath: PathBuf,
288 instrument_id: Option<InstrumentId>,
289 price_precision: Option<u8>,
290 ) -> PyResult<Vec<Bar>> {
291 self.load_bars(&filepath, instrument_id, price_precision)
292 .map_err(to_pyvalue_err)
293 }
294
295 #[pyo3(name = "load_bars_as_pycapsule")]
296 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
297 fn py_load_bars_as_pycapsule(
298 &self,
299 py: Python,
300 filepath: PathBuf,
301 instrument_id: Option<InstrumentId>,
302 price_precision: Option<u8>,
303 ) -> PyResult<PyObject> {
304 let iter = self
305 .read_records::<dbn::OhlcvMsg>(&filepath, instrument_id, price_precision, false)
306 .map_err(to_pyvalue_err)?;
307
308 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
309 }
310
311 #[pyo3(name = "load_status")]
312 #[pyo3(signature = (filepath, instrument_id=None))]
313 fn py_load_status(
314 &self,
315 filepath: PathBuf,
316 instrument_id: Option<InstrumentId>,
317 ) -> PyResult<Vec<InstrumentStatus>> {
318 let iter = self
319 .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
320 .map_err(to_pyvalue_err)?;
321
322 let mut data = Vec::new();
323 for result in iter {
324 match result {
325 Ok(item) => data.push(item),
326 Err(e) => return Err(to_pyvalue_err(e)),
327 }
328 }
329
330 Ok(data)
331 }
332
333 #[pyo3(name = "load_imbalance")]
334 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
335 fn py_load_imbalance(
336 &self,
337 filepath: PathBuf,
338 instrument_id: Option<InstrumentId>,
339 price_precision: Option<u8>,
340 ) -> PyResult<Vec<DatabentoImbalance>> {
341 let iter = self
342 .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
343 .map_err(to_pyvalue_err)?;
344
345 let mut data = Vec::new();
346 for result in iter {
347 match result {
348 Ok(item) => data.push(item),
349 Err(e) => return Err(to_pyvalue_err(e)),
350 }
351 }
352
353 Ok(data)
354 }
355
356 #[pyo3(name = "load_statistics")]
357 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
358 fn py_load_statistics(
359 &self,
360 filepath: PathBuf,
361 instrument_id: Option<InstrumentId>,
362 price_precision: Option<u8>,
363 ) -> PyResult<Vec<DatabentoStatistics>> {
364 let iter = self
365 .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
366 .map_err(to_pyvalue_err)?;
367
368 let mut data = Vec::new();
369 for result in iter {
370 match result {
371 Ok(item) => data.push(item),
372 Err(e) => return Err(to_pyvalue_err(e)),
373 }
374 }
375
376 Ok(data)
377 }
378}
379
380fn exhaust_data_iter_to_pycapsule(
381 py: Python,
382 iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
383) -> anyhow::Result<PyObject> {
384 let mut data = Vec::new();
385 for result in iter {
386 match result {
387 Ok((Some(item1), None)) => data.push(item1),
388 Ok((None, Some(item2))) => data.push(item2),
389 Ok((Some(item1), Some(item2))) => {
390 data.push(item1);
391 data.push(item2);
392 }
393 Ok((None, None)) => {
394 continue;
395 }
396 Err(e) => return Err(e),
397 }
398 }
399
400 let cvec: CVec = data.into();
401 let capsule = PyCapsule::new::<CVec>(py, cvec, None)?;
402
403 Ok(capsule.into_py_any_unwrap(py))
406}