nautilus_serialization/python/
arrow.rs1use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{
21 data::{
22 Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick, close::InstrumentClose,
24 },
25 python::data::{
26 pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27 pyobjects_to_instrument_closes, pyobjects_to_mark_prices, pyobjects_to_quotes,
28 pyobjects_to_trades,
29 },
30};
31use pyo3::{
32 conversion::IntoPyObjectExt,
33 exceptions::{PyRuntimeError, PyTypeError, PyValueError},
34 prelude::*,
35 types::{PyBytes, PyType},
36};
37
38use crate::arrow::{
39 ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
40 book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
41 instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
42 quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
43};
44
45fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
47 let mut cursor = Cursor::new(Vec::new());
49 {
50 let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
51 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
52
53 writer
54 .write(&batch)
55 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
56
57 writer
58 .finish()
59 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
60 }
61
62 let buffer = cursor.into_inner();
63 let pybytes = PyBytes::new(py, &buffer);
64
65 Ok(pybytes.into())
66}
67
68#[pyfunction]
74pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
75 let cls_str: String = cls.getattr("__name__")?.extract()?;
76 let result_map = match cls_str.as_str() {
77 stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
78 stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
79 stringify!(QuoteTick) => QuoteTick::get_schema_map(),
80 stringify!(TradeTick) => TradeTick::get_schema_map(),
81 stringify!(Bar) => Bar::get_schema_map(),
82 stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
83 stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
84 stringify!(InstrumentClose) => InstrumentClose::get_schema_map(),
85 _ => {
86 return Err(PyTypeError::new_err(format!(
87 "Arrow schema for `{cls_str}` is not currently implemented in Rust."
88 )));
89 }
90 };
91
92 result_map.into_py_any(py)
93}
94
95#[pyfunction]
108pub fn pyobjects_to_arrow_record_batch_bytes(
109 py: Python,
110 data: Vec<Bound<'_, PyAny>>,
111) -> PyResult<Py<PyBytes>> {
112 if data.is_empty() {
113 return Err(to_pyvalue_err("Empty data"));
114 }
115
116 let data_type: String = data
117 .first()
118 .unwrap() .getattr("__class__")?
120 .getattr("__name__")?
121 .extract()?;
122
123 match data_type.as_str() {
124 stringify!(OrderBookDelta) => {
125 let deltas = pyobjects_to_book_deltas(data)?;
126 py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
127 }
128 stringify!(OrderBookDepth10) => {
129 let depth_snapshots: Vec<OrderBookDepth10> = data
130 .into_iter()
131 .map(|obj| obj.extract::<OrderBookDepth10>())
132 .collect::<PyResult<Vec<OrderBookDepth10>>>()?;
133 py_book_depth10_to_arrow_record_batch_bytes(py, depth_snapshots)
134 }
135 stringify!(QuoteTick) => {
136 let quotes = pyobjects_to_quotes(data)?;
137 py_quotes_to_arrow_record_batch_bytes(py, quotes)
138 }
139 stringify!(TradeTick) => {
140 let trades = pyobjects_to_trades(data)?;
141 py_trades_to_arrow_record_batch_bytes(py, trades)
142 }
143 stringify!(Bar) => {
144 let bars = pyobjects_to_bars(data)?;
145 py_bars_to_arrow_record_batch_bytes(py, bars)
146 }
147 stringify!(MarkPriceUpdate) => {
148 let updates = pyobjects_to_mark_prices(data)?;
149 py_mark_prices_to_arrow_record_batch_bytes(py, updates)
150 }
151 stringify!(IndexPriceUpdate) => {
152 let index_prices = pyobjects_to_index_prices(data)?;
153 py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
154 }
155 stringify!(InstrumentClose) => {
156 let closes = pyobjects_to_instrument_closes(data)?;
157 py_instrument_closes_to_arrow_record_batch_bytes(py, closes)
158 }
159 _ => Err(PyValueError::new_err(format!(
160 "unsupported data type: {data_type}"
161 ))),
162 }
163}
164
165#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
171pub fn py_book_deltas_to_arrow_record_batch_bytes(
172 py: Python,
173 data: Vec<OrderBookDelta>,
174) -> PyResult<Py<PyBytes>> {
175 match book_deltas_to_arrow_record_batch_bytes(data) {
176 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
177 Err(e) => Err(to_pyvalue_err(e)),
178 }
179}
180
181#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
187pub fn py_book_depth10_to_arrow_record_batch_bytes(
188 py: Python,
189 data: Vec<OrderBookDepth10>,
190) -> PyResult<Py<PyBytes>> {
191 match book_depth10_to_arrow_record_batch_bytes(data) {
192 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
193 Err(e) => Err(to_pyvalue_err(e)),
194 }
195}
196
197#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
203pub fn py_quotes_to_arrow_record_batch_bytes(
204 py: Python,
205 data: Vec<QuoteTick>,
206) -> PyResult<Py<PyBytes>> {
207 match quotes_to_arrow_record_batch_bytes(data) {
208 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
209 Err(e) => Err(to_pyvalue_err(e)),
210 }
211}
212
213#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
219pub fn py_trades_to_arrow_record_batch_bytes(
220 py: Python,
221 data: Vec<TradeTick>,
222) -> PyResult<Py<PyBytes>> {
223 match trades_to_arrow_record_batch_bytes(data) {
224 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
225 Err(e) => Err(to_pyvalue_err(e)),
226 }
227}
228
229#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
235pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
236 match bars_to_arrow_record_batch_bytes(data) {
237 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
238 Err(e) => Err(to_pyvalue_err(e)),
239 }
240}
241
242#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
248pub fn py_mark_prices_to_arrow_record_batch_bytes(
249 py: Python,
250 data: Vec<MarkPriceUpdate>,
251) -> PyResult<Py<PyBytes>> {
252 match mark_prices_to_arrow_record_batch_bytes(data) {
253 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
254 Err(e) => Err(to_pyvalue_err(e)),
255 }
256}
257
258#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
264pub fn py_index_prices_to_arrow_record_batch_bytes(
265 py: Python,
266 data: Vec<IndexPriceUpdate>,
267) -> PyResult<Py<PyBytes>> {
268 match index_prices_to_arrow_record_batch_bytes(data) {
269 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
270 Err(e) => Err(to_pyvalue_err(e)),
271 }
272}
273
274#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
280pub fn py_instrument_closes_to_arrow_record_batch_bytes(
281 py: Python,
282 data: Vec<InstrumentClose>,
283) -> PyResult<Py<PyBytes>> {
284 match instrument_closes_to_arrow_record_batch_bytes(data) {
285 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
286 Err(e) => Err(to_pyvalue_err(e)),
287 }
288}