nautilus_persistence/python/wranglers/
delta.rs1use std::{collections::HashMap, io::Cursor, str::FromStr};
17
18use datafusion::arrow::ipc::reader::StreamReader;
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{data::OrderBookDelta, identifiers::InstrumentId};
21use nautilus_serialization::arrow::DecodeFromRecordBatch;
22use pyo3::prelude::*;
23
24#[pyclass()]
25pub struct OrderBookDeltaDataWrangler {
26 instrument_id: InstrumentId,
27 price_precision: u8,
28 size_precision: u8,
29 metadata: HashMap<String, String>,
30}
31
32#[pymethods]
33impl OrderBookDeltaDataWrangler {
34 #[new]
35 fn py_new(instrument_id: &str, price_precision: u8, size_precision: u8) -> PyResult<Self> {
36 let instrument_id = InstrumentId::from_str(instrument_id).map_err(to_pyvalue_err)?;
37 let metadata =
38 OrderBookDelta::get_metadata(&instrument_id, price_precision, size_precision);
39
40 Ok(Self {
41 instrument_id,
42 price_precision,
43 size_precision,
44 metadata,
45 })
46 }
47
48 #[getter]
49 fn instrument_id(&self) -> String {
50 self.instrument_id.to_string()
51 }
52
53 #[getter]
54 const fn price_precision(&self) -> u8 {
55 self.price_precision
56 }
57
58 #[getter]
59 const fn size_precision(&self) -> u8 {
60 self.size_precision
61 }
62
63 fn process_record_batch_bytes(&self, data: &[u8]) -> PyResult<Vec<OrderBookDelta>> {
64 let cursor = Cursor::new(data);
66 let reader = match StreamReader::try_new(cursor, None) {
67 Ok(reader) => reader,
68 Err(e) => return Err(to_pyvalue_err(e)),
69 };
70
71 let mut deltas = Vec::new();
72
73 for maybe_batch in reader {
75 let record_batch = match maybe_batch {
76 Ok(record_batch) => record_batch,
77 Err(e) => return Err(to_pyvalue_err(e)),
78 };
79
80 let batch_deltas = OrderBookDelta::decode_batch(&self.metadata, record_batch)
81 .map_err(to_pyvalue_err)?;
82 deltas.extend(batch_deltas);
83 }
84
85 Ok(deltas)
86 }
87}