nautilus_persistence/python/wranglers/
bar.rs1use std::{collections::HashMap, io::Cursor, str::FromStr};
17
18use datafusion::arrow::ipc::reader::StreamReader;
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::data::bar::{Bar, BarType};
21use nautilus_serialization::arrow::DecodeFromRecordBatch;
22use pyo3::prelude::*;
23
24#[pyclass]
25pub struct BarDataWrangler {
26 bar_type: BarType,
27 price_precision: u8,
28 size_precision: u8,
29 metadata: HashMap<String, String>,
30}
31
32#[pymethods]
33impl BarDataWrangler {
34 #[new]
35 fn py_new(bar_type: &str, price_precision: u8, size_precision: u8) -> PyResult<Self> {
36 let bar_type = BarType::from_str(bar_type).map_err(to_pyvalue_err)?;
37 let metadata = Bar::get_metadata(&bar_type, price_precision, size_precision);
38
39 Ok(Self {
40 bar_type,
41 price_precision,
42 size_precision,
43 metadata,
44 })
45 }
46
47 #[getter]
48 fn bar_type(&self) -> String {
49 self.bar_type.to_string()
50 }
51
52 #[getter]
53 const fn price_precision(&self) -> u8 {
54 self.price_precision
55 }
56
57 #[getter]
58 const fn size_precision(&self) -> u8 {
59 self.size_precision
60 }
61
62 fn process_record_batch_bytes(&self, data: &[u8]) -> PyResult<Vec<Bar>> {
63 let cursor = Cursor::new(data);
65 let reader = match StreamReader::try_new(cursor, None) {
66 Ok(reader) => reader,
67 Err(e) => return Err(to_pyvalue_err(e)),
68 };
69
70 let mut bars = Vec::new();
71
72 for maybe_batch in reader {
74 let record_batch = match maybe_batch {
75 Ok(record_batch) => record_batch,
76 Err(e) => return Err(to_pyvalue_err(e)),
77 };
78
79 let batch_bars =
80 Bar::decode_batch(&self.metadata, record_batch).map_err(to_pyvalue_err)?;
81 bars.extend(batch_bars);
82 }
83
84 Ok(bars)
85 }
86}