1use std::{collections::HashMap, path::PathBuf};
19
20use databento::dbn;
21use nautilus_core::{
22 ffi::cvec::CVec,
23 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
24};
25use nautilus_model::{
26 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
27 identifiers::{InstrumentId, Venue},
28 python::instruments::instrument_any_to_pyobject,
29};
30use pyo3::{
31 prelude::*,
32 types::{PyCapsule, PyList},
33};
34use ustr::Ustr;
35
36use crate::{
37 loader::DatabentoDataLoader,
38 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
39};
40
41#[pymethods]
42impl DatabentoDataLoader {
43 #[new]
44 #[pyo3(signature = (publishers_filepath=None))]
45 fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
46 Self::new(publishers_filepath).map_err(to_pyvalue_err)
47 }
48
49 #[pyo3(name = "load_publishers")]
50 fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
51 self.load_publishers(publishers_filepath)
52 .map_err(to_pyvalue_err)
53 }
54
55 #[must_use]
56 #[pyo3(name = "get_publishers")]
57 fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
58 self.get_publishers()
59 .iter()
60 .map(|(&key, value)| (key, value.clone()))
61 .collect::<HashMap<u16, DatabentoPublisher>>()
62 }
63
64 #[pyo3(name = "set_dataset_for_venue")]
65 fn py_set_dataset_for_venue(&mut self, dataset: String, venue: Venue) {
66 self.set_dataset_for_venue(Ustr::from(&dataset), venue);
67 }
68
69 #[must_use]
70 #[pyo3(name = "get_dataset_for_venue")]
71 fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
72 self.get_dataset_for_venue(venue).map(ToString::to_string)
73 }
74
75 #[must_use]
76 #[pyo3(name = "get_venue_for_publisher")]
77 fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
78 self.get_venue_for_publisher(publisher_id)
79 .map(ToString::to_string)
80 }
81
82 #[pyo3(name = "schema_for_file")]
83 fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
84 self.schema_from_file(&filepath).map_err(to_pyvalue_err)
85 }
86
87 #[pyo3(name = "load_instruments")]
88 fn py_load_instruments(
89 &mut self,
90 py: Python,
91 filepath: PathBuf,
92 use_exchange_as_venue: bool,
93 ) -> PyResult<Py<PyAny>> {
94 let iter = self
95 .load_instruments(&filepath, use_exchange_as_venue)
96 .map_err(to_pyvalue_err)?;
97
98 let mut data = Vec::new();
99 for instrument in iter {
100 let py_object = instrument_any_to_pyobject(py, instrument)?;
101 data.push(py_object);
102 }
103
104 let list = PyList::new(py, &data).expect("Invalid `ExactSizeIterator`");
105
106 Ok(list.into_py_any_unwrap(py))
107 }
108
109 #[pyo3(name = "load_order_book_deltas")]
111 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
112 fn py_load_order_book_deltas(
113 &self,
114 filepath: PathBuf,
115 instrument_id: Option<InstrumentId>,
116 price_precision: Option<u8>,
117 ) -> PyResult<Vec<OrderBookDelta>> {
118 self.load_order_book_deltas(&filepath, instrument_id, price_precision)
119 .map_err(to_pyvalue_err)
120 }
121
122 #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
123 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
124 fn py_load_order_book_deltas_as_pycapsule(
125 &self,
126 py: Python,
127 filepath: PathBuf,
128 instrument_id: Option<InstrumentId>,
129 price_precision: Option<u8>,
130 include_trades: Option<bool>,
131 ) -> PyResult<Py<PyAny>> {
132 let iter = self
133 .read_records::<dbn::MboMsg>(
134 &filepath,
135 instrument_id,
136 price_precision,
137 include_trades.unwrap_or(false),
138 None,
139 )
140 .map_err(to_pyvalue_err)?;
141
142 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
143 }
144
145 #[pyo3(name = "load_order_book_depth10")]
146 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
147 fn py_load_order_book_depth10(
148 &self,
149 filepath: PathBuf,
150 instrument_id: Option<InstrumentId>,
151 price_precision: Option<u8>,
152 ) -> PyResult<Vec<OrderBookDepth10>> {
153 self.load_order_book_depth10(&filepath, instrument_id, price_precision)
154 .map_err(to_pyvalue_err)
155 }
156
157 #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
158 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
159 fn py_load_order_book_depth10_as_pycapsule(
160 &self,
161 py: Python,
162 filepath: PathBuf,
163 instrument_id: Option<InstrumentId>,
164 price_precision: Option<u8>,
165 ) -> PyResult<Py<PyAny>> {
166 let iter = self
167 .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false, None)
168 .map_err(to_pyvalue_err)?;
169
170 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
171 }
172
173 #[pyo3(name = "load_quotes")]
174 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
175 fn py_load_quotes(
176 &self,
177 filepath: PathBuf,
178 instrument_id: Option<InstrumentId>,
179 price_precision: Option<u8>,
180 ) -> PyResult<Vec<QuoteTick>> {
181 self.load_quotes(&filepath, instrument_id, price_precision)
182 .map_err(to_pyvalue_err)
183 }
184
185 #[pyo3(name = "load_quotes_as_pycapsule")]
186 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
187 fn py_load_quotes_as_pycapsule(
188 &self,
189 py: Python,
190 filepath: PathBuf,
191 instrument_id: Option<InstrumentId>,
192 price_precision: Option<u8>,
193 include_trades: Option<bool>,
194 ) -> PyResult<Py<PyAny>> {
195 let iter = self
196 .read_records::<dbn::Mbp1Msg>(
197 &filepath,
198 instrument_id,
199 price_precision,
200 include_trades.unwrap_or(false),
201 None,
202 )
203 .map_err(to_pyvalue_err)?;
204
205 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
206 }
207
208 #[pyo3(name = "load_bbo_quotes")]
209 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
210 fn py_load_bbo_quotes(
211 &self,
212 filepath: PathBuf,
213 instrument_id: Option<InstrumentId>,
214 price_precision: Option<u8>,
215 ) -> PyResult<Vec<QuoteTick>> {
216 self.load_bbo_quotes(&filepath, instrument_id, price_precision)
217 .map_err(to_pyvalue_err)
218 }
219
220 #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
221 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
222 fn py_load_bbo_quotes_as_pycapsule(
223 &self,
224 py: Python,
225 filepath: PathBuf,
226 instrument_id: Option<InstrumentId>,
227 price_precision: Option<u8>,
228 ) -> PyResult<Py<PyAny>> {
229 let iter = self
230 .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false, None)
231 .map_err(to_pyvalue_err)?;
232
233 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
234 }
235
236 #[pyo3(name = "load_cmbp_quotes")]
237 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
238 fn py_load_cmbp_quotes(
239 &self,
240 filepath: PathBuf,
241 instrument_id: Option<InstrumentId>,
242 price_precision: Option<u8>,
243 ) -> PyResult<Vec<QuoteTick>> {
244 self.load_cmbp_quotes(&filepath, instrument_id, price_precision)
245 .map_err(to_pyvalue_err)
246 }
247
248 #[pyo3(name = "load_cmbp_quotes_as_pycapsule")]
249 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
250 fn py_load_cmbp_quotes_as_pycapsule(
251 &self,
252 py: Python,
253 filepath: PathBuf,
254 instrument_id: Option<InstrumentId>,
255 price_precision: Option<u8>,
256 include_trades: Option<bool>,
257 ) -> PyResult<Py<PyAny>> {
258 let iter = self
259 .read_records::<dbn::Cmbp1Msg>(
260 &filepath,
261 instrument_id,
262 price_precision,
263 include_trades.unwrap_or(false),
264 None,
265 )
266 .map_err(to_pyvalue_err)?;
267
268 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
269 }
270
271 #[pyo3(name = "load_cbbo_quotes")]
272 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
273 fn py_load_cbbo_quotes(
274 &self,
275 filepath: PathBuf,
276 instrument_id: Option<InstrumentId>,
277 price_precision: Option<u8>,
278 ) -> PyResult<Vec<QuoteTick>> {
279 self.load_cbbo_quotes(&filepath, instrument_id, price_precision)
280 .map_err(to_pyvalue_err)
281 }
282
283 #[pyo3(name = "load_cbbo_quotes_as_pycapsule")]
284 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
285 fn py_load_cbbo_quotes_as_pycapsule(
286 &self,
287 py: Python,
288 filepath: PathBuf,
289 instrument_id: Option<InstrumentId>,
290 price_precision: Option<u8>,
291 ) -> PyResult<Py<PyAny>> {
292 let iter = self
293 .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
294 .map_err(to_pyvalue_err)?;
295
296 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
297 }
298
299 #[pyo3(name = "load_tbbo_trades")]
300 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
301 fn py_load_tbbo_trades(
302 &self,
303 filepath: PathBuf,
304 instrument_id: Option<InstrumentId>,
305 price_precision: Option<u8>,
306 ) -> PyResult<Vec<TradeTick>> {
307 self.load_tbbo_trades(&filepath, instrument_id, price_precision)
308 .map_err(to_pyvalue_err)
309 }
310
311 #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
312 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
313 fn py_load_tbbo_trades_as_pycapsule(
314 &self,
315 py: Python,
316 filepath: PathBuf,
317 instrument_id: Option<InstrumentId>,
318 price_precision: Option<u8>,
319 ) -> PyResult<Py<PyAny>> {
320 let iter = self
321 .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false, None)
322 .map_err(to_pyvalue_err)?;
323
324 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
325 }
326
327 #[pyo3(name = "load_tcbbo_trades")]
328 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
329 fn py_load_tcbbo_trades(
330 &self,
331 filepath: PathBuf,
332 instrument_id: Option<InstrumentId>,
333 price_precision: Option<u8>,
334 ) -> PyResult<Vec<TradeTick>> {
335 self.load_tcbbo_trades(&filepath, instrument_id, price_precision)
336 .map_err(to_pyvalue_err)
337 }
338
339 #[pyo3(name = "load_tcbbo_trades_as_pycapsule")]
340 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
341 fn py_load_tcbbo_trades_as_pycapsule(
342 &self,
343 py: Python,
344 filepath: PathBuf,
345 instrument_id: Option<InstrumentId>,
346 price_precision: Option<u8>,
347 ) -> PyResult<Py<PyAny>> {
348 let iter = self
349 .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
350 .map_err(to_pyvalue_err)?;
351
352 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
353 }
354
355 #[pyo3(name = "load_trades")]
356 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
357 fn py_load_trades(
358 &self,
359 filepath: PathBuf,
360 instrument_id: Option<InstrumentId>,
361 price_precision: Option<u8>,
362 ) -> PyResult<Vec<TradeTick>> {
363 self.load_trades(&filepath, instrument_id, price_precision)
364 .map_err(to_pyvalue_err)
365 }
366
367 #[pyo3(name = "load_trades_as_pycapsule")]
368 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
369 fn py_load_trades_as_pycapsule(
370 &self,
371 py: Python,
372 filepath: PathBuf,
373 instrument_id: Option<InstrumentId>,
374 price_precision: Option<u8>,
375 ) -> PyResult<Py<PyAny>> {
376 let iter = self
377 .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false, None)
378 .map_err(to_pyvalue_err)?;
379
380 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
381 }
382
383 #[pyo3(name = "load_bars")]
384 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
385 fn py_load_bars(
386 &self,
387 filepath: PathBuf,
388 instrument_id: Option<InstrumentId>,
389 price_precision: Option<u8>,
390 timestamp_on_close: bool,
391 ) -> PyResult<Vec<Bar>> {
392 self.load_bars(
393 &filepath,
394 instrument_id,
395 price_precision,
396 Some(timestamp_on_close),
397 )
398 .map_err(to_pyvalue_err)
399 }
400
401 #[pyo3(name = "load_bars_as_pycapsule")]
402 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
403 fn py_load_bars_as_pycapsule(
404 &self,
405 py: Python,
406 filepath: PathBuf,
407 instrument_id: Option<InstrumentId>,
408 price_precision: Option<u8>,
409 timestamp_on_close: bool,
410 ) -> PyResult<Py<PyAny>> {
411 let iter = self
412 .read_records::<dbn::OhlcvMsg>(
413 &filepath,
414 instrument_id,
415 price_precision,
416 false,
417 Some(timestamp_on_close),
418 )
419 .map_err(to_pyvalue_err)?;
420
421 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
422 }
423
424 #[pyo3(name = "load_status")]
425 #[pyo3(signature = (filepath, instrument_id=None))]
426 fn py_load_status(
427 &self,
428 filepath: PathBuf,
429 instrument_id: Option<InstrumentId>,
430 ) -> PyResult<Vec<InstrumentStatus>> {
431 let iter = self
432 .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
433 .map_err(to_pyvalue_err)?;
434
435 let mut data = Vec::new();
436 for result in iter {
437 match result {
438 Ok(item) => data.push(item),
439 Err(e) => return Err(to_pyvalue_err(e)),
440 }
441 }
442
443 Ok(data)
444 }
445
446 #[pyo3(name = "load_imbalance")]
447 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
448 fn py_load_imbalance(
449 &self,
450 filepath: PathBuf,
451 instrument_id: Option<InstrumentId>,
452 price_precision: Option<u8>,
453 ) -> PyResult<Vec<DatabentoImbalance>> {
454 let iter = self
455 .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
456 .map_err(to_pyvalue_err)?;
457
458 let mut data = Vec::new();
459 for result in iter {
460 match result {
461 Ok(item) => data.push(item),
462 Err(e) => return Err(to_pyvalue_err(e)),
463 }
464 }
465
466 Ok(data)
467 }
468
469 #[pyo3(name = "load_statistics")]
470 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
471 fn py_load_statistics(
472 &self,
473 filepath: PathBuf,
474 instrument_id: Option<InstrumentId>,
475 price_precision: Option<u8>,
476 ) -> PyResult<Vec<DatabentoStatistics>> {
477 let iter = self
478 .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
479 .map_err(to_pyvalue_err)?;
480
481 let mut data = Vec::new();
482 for result in iter {
483 match result {
484 Ok(item) => data.push(item),
485 Err(e) => return Err(to_pyvalue_err(e)),
486 }
487 }
488
489 Ok(data)
490 }
491}
492
493fn exhaust_data_iter_to_pycapsule(
494 py: Python,
495 iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
496) -> anyhow::Result<Py<PyAny>> {
497 let mut data = Vec::new();
498 for result in iter {
499 match result {
500 Ok((Some(item1), None)) => data.push(item1),
501 Ok((None, Some(item2))) => data.push(item2),
502 Ok((Some(item1), Some(item2))) => {
503 data.push(item1);
504 data.push(item2);
505 }
506 Ok((None, None)) => {
507 continue;
508 }
509 Err(e) => return Err(e),
510 }
511 }
512
513 let cvec: CVec = data.into();
514 let capsule = PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {})?;
515
516 Ok(capsule.into_py_any_unwrap(py))
519}