1use std::{collections::HashMap, path::PathBuf};
17
18use databento::dbn;
19use nautilus_core::{
20 ffi::cvec::CVec,
21 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
22};
23use nautilus_model::{
24 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
25 identifiers::{InstrumentId, Venue},
26 python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29 prelude::*,
30 types::{PyCapsule, PyList},
31};
32use ustr::Ustr;
33
34use crate::{
35 loader::DatabentoDataLoader,
36 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
37};
38
39#[pymethods]
40impl DatabentoDataLoader {
41 #[new]
42 #[pyo3(signature = (publishers_filepath=None))]
43 fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
44 Self::new(publishers_filepath).map_err(to_pyvalue_err)
45 }
46
47 #[pyo3(name = "load_publishers")]
48 fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
49 self.load_publishers(publishers_filepath)
50 .map_err(to_pyvalue_err)
51 }
52
53 #[must_use]
54 #[pyo3(name = "get_publishers")]
55 fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
56 self.get_publishers()
57 .iter()
58 .map(|(&key, value)| (key, value.clone()))
59 .collect::<HashMap<u16, DatabentoPublisher>>()
60 }
61
62 #[pyo3(name = "set_dataset_for_venue")]
63 fn py_set_dataset_for_venue(&mut self, dataset: String, venue: Venue) {
64 self.set_dataset_for_venue(Ustr::from(&dataset), venue);
65 }
66
67 #[must_use]
68 #[pyo3(name = "get_dataset_for_venue")]
69 fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
70 self.get_dataset_for_venue(venue).map(ToString::to_string)
71 }
72
73 #[must_use]
74 #[pyo3(name = "get_venue_for_publisher")]
75 fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
76 self.get_venue_for_publisher(publisher_id)
77 .map(ToString::to_string)
78 }
79
80 #[pyo3(name = "schema_for_file")]
81 fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
82 self.schema_from_file(&filepath).map_err(to_pyvalue_err)
83 }
84
85 #[pyo3(name = "load_instruments")]
86 fn py_load_instruments(
87 &mut self,
88 py: Python,
89 filepath: PathBuf,
90 use_exchange_as_venue: bool,
91 ) -> PyResult<PyObject> {
92 let iter = self
93 .load_instruments(&filepath, use_exchange_as_venue)
94 .map_err(to_pyvalue_err)?;
95
96 let mut data = Vec::new();
97 for instrument in iter {
98 let py_object = instrument_any_to_pyobject(py, instrument)?;
99 data.push(py_object);
100 }
101
102 Ok(PyList::new(py, &data)
103 .expect("Invalid `ExactSizeIterator`")
104 .into())
105 }
106
107 #[pyo3(name = "load_order_book_deltas")]
109 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
110 fn py_load_order_book_deltas(
111 &self,
112 filepath: PathBuf,
113 instrument_id: Option<InstrumentId>,
114 price_precision: Option<u8>,
115 ) -> PyResult<Vec<OrderBookDelta>> {
116 self.load_order_book_deltas(&filepath, instrument_id, price_precision)
117 .map_err(to_pyvalue_err)
118 }
119
120 #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
121 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
122 fn py_load_order_book_deltas_as_pycapsule(
123 &self,
124 py: Python,
125 filepath: PathBuf,
126 instrument_id: Option<InstrumentId>,
127 price_precision: Option<u8>,
128 include_trades: Option<bool>,
129 ) -> PyResult<PyObject> {
130 let iter = self
131 .read_records::<dbn::MboMsg>(
132 &filepath,
133 instrument_id,
134 price_precision,
135 include_trades.unwrap_or(false),
136 )
137 .map_err(to_pyvalue_err)?;
138
139 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
140 }
141
142 #[pyo3(name = "load_order_book_depth10")]
143 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
144 fn py_load_order_book_depth10(
145 &self,
146 filepath: PathBuf,
147 instrument_id: Option<InstrumentId>,
148 price_precision: Option<u8>,
149 ) -> PyResult<Vec<OrderBookDepth10>> {
150 self.load_order_book_depth10(&filepath, instrument_id, price_precision)
151 .map_err(to_pyvalue_err)
152 }
153
154 #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
155 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
156 fn py_load_order_book_depth10_as_pycapsule(
157 &self,
158 py: Python,
159 filepath: PathBuf,
160 instrument_id: Option<InstrumentId>,
161 price_precision: Option<u8>,
162 ) -> PyResult<PyObject> {
163 let iter = self
164 .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false)
165 .map_err(to_pyvalue_err)?;
166
167 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
168 }
169
170 #[pyo3(name = "load_quotes")]
171 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
172 fn py_load_quotes(
173 &self,
174 filepath: PathBuf,
175 instrument_id: Option<InstrumentId>,
176 price_precision: Option<u8>,
177 ) -> PyResult<Vec<QuoteTick>> {
178 self.load_quotes(&filepath, instrument_id, price_precision)
179 .map_err(to_pyvalue_err)
180 }
181
182 #[pyo3(name = "load_quotes_as_pycapsule")]
183 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
184 fn py_load_quotes_as_pycapsule(
185 &self,
186 py: Python,
187 filepath: PathBuf,
188 instrument_id: Option<InstrumentId>,
189 price_precision: Option<u8>,
190 include_trades: Option<bool>,
191 ) -> PyResult<PyObject> {
192 let iter = self
193 .read_records::<dbn::Mbp1Msg>(
194 &filepath,
195 instrument_id,
196 price_precision,
197 include_trades.unwrap_or(false),
198 )
199 .map_err(to_pyvalue_err)?;
200
201 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
202 }
203
204 #[pyo3(name = "load_bbo_quotes")]
205 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
206 fn py_load_bbo_quotes(
207 &self,
208 filepath: PathBuf,
209 instrument_id: Option<InstrumentId>,
210 price_precision: Option<u8>,
211 ) -> PyResult<Vec<QuoteTick>> {
212 self.load_bbo_quotes(&filepath, instrument_id, price_precision)
213 .map_err(to_pyvalue_err)
214 }
215
216 #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
217 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
218 fn py_load_bbo_quotes_as_pycapsule(
219 &self,
220 py: Python,
221 filepath: PathBuf,
222 instrument_id: Option<InstrumentId>,
223 price_precision: Option<u8>,
224 ) -> PyResult<PyObject> {
225 let iter = self
226 .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false)
227 .map_err(to_pyvalue_err)?;
228
229 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
230 }
231
232 #[pyo3(name = "load_tbbo_trades")]
233 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
234 fn py_load_tbbo_trades(
235 &self,
236 filepath: PathBuf,
237 instrument_id: Option<InstrumentId>,
238 price_precision: Option<u8>,
239 ) -> PyResult<Vec<TradeTick>> {
240 self.load_tbbo_trades(&filepath, instrument_id, price_precision)
241 .map_err(to_pyvalue_err)
242 }
243
244 #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
245 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
246 fn py_load_tbbo_trades_as_pycapsule(
247 &self,
248 py: Python,
249 filepath: PathBuf,
250 instrument_id: Option<InstrumentId>,
251 price_precision: Option<u8>,
252 ) -> PyResult<PyObject> {
253 let iter = self
254 .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false)
255 .map_err(to_pyvalue_err)?;
256
257 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
258 }
259
260 #[pyo3(name = "load_trades")]
261 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
262 fn py_load_trades(
263 &self,
264 filepath: PathBuf,
265 instrument_id: Option<InstrumentId>,
266 price_precision: Option<u8>,
267 ) -> PyResult<Vec<TradeTick>> {
268 self.load_trades(&filepath, instrument_id, price_precision)
269 .map_err(to_pyvalue_err)
270 }
271
272 #[pyo3(name = "load_trades_as_pycapsule")]
273 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
274 fn py_load_trades_as_pycapsule(
275 &self,
276 py: Python,
277 filepath: PathBuf,
278 instrument_id: Option<InstrumentId>,
279 price_precision: Option<u8>,
280 ) -> PyResult<PyObject> {
281 let iter = self
282 .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false)
283 .map_err(to_pyvalue_err)?;
284
285 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
286 }
287
288 #[pyo3(name = "load_bars")]
289 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
290 fn py_load_bars(
291 &self,
292 filepath: PathBuf,
293 instrument_id: Option<InstrumentId>,
294 price_precision: Option<u8>,
295 ) -> PyResult<Vec<Bar>> {
296 self.load_bars(&filepath, instrument_id, price_precision)
297 .map_err(to_pyvalue_err)
298 }
299
300 #[pyo3(name = "load_bars_as_pycapsule")]
301 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
302 fn py_load_bars_as_pycapsule(
303 &self,
304 py: Python,
305 filepath: PathBuf,
306 instrument_id: Option<InstrumentId>,
307 price_precision: Option<u8>,
308 ) -> PyResult<PyObject> {
309 let iter = self
310 .read_records::<dbn::OhlcvMsg>(&filepath, instrument_id, price_precision, false)
311 .map_err(to_pyvalue_err)?;
312
313 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
314 }
315
316 #[pyo3(name = "load_status")]
317 #[pyo3(signature = (filepath, instrument_id=None))]
318 fn py_load_status(
319 &self,
320 filepath: PathBuf,
321 instrument_id: Option<InstrumentId>,
322 ) -> PyResult<Vec<InstrumentStatus>> {
323 let iter = self
324 .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
325 .map_err(to_pyvalue_err)?;
326
327 let mut data = Vec::new();
328 for result in iter {
329 match result {
330 Ok(item) => data.push(item),
331 Err(e) => return Err(to_pyvalue_err(e)),
332 }
333 }
334
335 Ok(data)
336 }
337
338 #[pyo3(name = "load_imbalance")]
339 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
340 fn py_load_imbalance(
341 &self,
342 filepath: PathBuf,
343 instrument_id: Option<InstrumentId>,
344 price_precision: Option<u8>,
345 ) -> PyResult<Vec<DatabentoImbalance>> {
346 let iter = self
347 .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
348 .map_err(to_pyvalue_err)?;
349
350 let mut data = Vec::new();
351 for result in iter {
352 match result {
353 Ok(item) => data.push(item),
354 Err(e) => return Err(to_pyvalue_err(e)),
355 }
356 }
357
358 Ok(data)
359 }
360
361 #[pyo3(name = "load_statistics")]
362 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
363 fn py_load_statistics(
364 &self,
365 filepath: PathBuf,
366 instrument_id: Option<InstrumentId>,
367 price_precision: Option<u8>,
368 ) -> PyResult<Vec<DatabentoStatistics>> {
369 let iter = self
370 .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
371 .map_err(to_pyvalue_err)?;
372
373 let mut data = Vec::new();
374 for result in iter {
375 match result {
376 Ok(item) => data.push(item),
377 Err(e) => return Err(to_pyvalue_err(e)),
378 }
379 }
380
381 Ok(data)
382 }
383}
384
385fn exhaust_data_iter_to_pycapsule(
386 py: Python,
387 iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
388) -> anyhow::Result<PyObject> {
389 let mut data = Vec::new();
390 for result in iter {
391 match result {
392 Ok((Some(item1), None)) => data.push(item1),
393 Ok((None, Some(item2))) => data.push(item2),
394 Ok((Some(item1), Some(item2))) => {
395 data.push(item1);
396 data.push(item2);
397 }
398 Ok((None, None)) => {
399 continue;
400 }
401 Err(e) => return Err(e),
402 }
403 }
404
405 let cvec: CVec = data.into();
406 let capsule = PyCapsule::new::<CVec>(py, cvec, None)?;
407
408 Ok(capsule.into_py_any_unwrap(py))
411}