nautilus_persistence/python/wranglers/
quote.rs1use std::{collections::HashMap, io::Cursor, str::FromStr};
17
18use datafusion::arrow::ipc::reader::StreamReader;
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{data::QuoteTick, identifiers::InstrumentId};
21use nautilus_serialization::arrow::DecodeFromRecordBatch;
22use pyo3::prelude::*;
23
24#[pyclass]
25pub struct QuoteTickDataWrangler {
26 instrument_id: InstrumentId,
27 price_precision: u8,
28 size_precision: u8,
29 metadata: HashMap<String, String>,
30}
31
32#[pymethods]
33impl QuoteTickDataWrangler {
34 #[new]
35 fn py_new(instrument_id: &str, price_precision: u8, size_precision: u8) -> PyResult<Self> {
36 let instrument_id = InstrumentId::from_str(instrument_id).map_err(to_pyvalue_err)?;
37 let metadata = QuoteTick::get_metadata(&instrument_id, price_precision, size_precision);
38
39 Ok(Self {
40 instrument_id,
41 price_precision,
42 size_precision,
43 metadata,
44 })
45 }
46
47 #[getter]
48 fn instrument_id(&self) -> String {
49 self.instrument_id.to_string()
50 }
51
52 #[getter]
53 const fn price_precision(&self) -> u8 {
54 self.price_precision
55 }
56
57 #[getter]
58 const fn size_precision(&self) -> u8 {
59 self.size_precision
60 }
61
62 fn process_record_batch_bytes(&self, data: &[u8]) -> PyResult<Vec<QuoteTick>> {
63 let cursor = Cursor::new(data);
65 let reader = match StreamReader::try_new(cursor, None) {
66 Ok(reader) => reader,
67 Err(e) => return Err(to_pyvalue_err(e)),
68 };
69
70 let mut quotes = Vec::new();
71
72 for maybe_batch in reader {
74 let record_batch = match maybe_batch {
75 Ok(record_batch) => record_batch,
76 Err(e) => return Err(to_pyvalue_err(e)),
77 };
78
79 let batch_deltas =
80 QuoteTick::decode_batch(&self.metadata, record_batch).map_err(to_pyvalue_err)?;
81 quotes.extend(batch_deltas);
82 }
83
84 Ok(quotes)
85 }
86}