nautilus_serialization/python/
arrow.rs
1use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{
21 data::{
22 Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick, close::InstrumentClose,
24 },
25 python::data::{
26 pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27 pyobjects_to_mark_prices, pyobjects_to_quotes, pyobjects_to_trades,
28 },
29};
30use pyo3::{
31 conversion::IntoPyObjectExt,
32 exceptions::{PyRuntimeError, PyTypeError, PyValueError},
33 prelude::*,
34 types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38 ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39 book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
40 instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
41 quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
42};
43
44fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
46 let mut cursor = Cursor::new(Vec::new());
48 {
49 let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
50 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
51
52 writer
53 .write(&batch)
54 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
55
56 writer
57 .finish()
58 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
59 }
60
61 let buffer = cursor.into_inner();
62 let pybytes = PyBytes::new(py, &buffer);
63
64 Ok(pybytes.into())
65}
66
67#[pyfunction]
68pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
69 let cls_str: String = cls.getattr("__name__")?.extract()?;
70 let result_map = match cls_str.as_str() {
71 stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
72 stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
73 stringify!(QuoteTick) => QuoteTick::get_schema_map(),
74 stringify!(TradeTick) => TradeTick::get_schema_map(),
75 stringify!(Bar) => Bar::get_schema_map(),
76 stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
77 stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
78 _ => {
79 return Err(PyTypeError::new_err(format!(
80 "Arrow schema for `{cls_str}` is not currently implemented in Rust."
81 )));
82 }
83 };
84
85 result_map.into_py_any(py)
86}
87
88#[pyfunction]
91pub fn pyobjects_to_arrow_record_batch_bytes(
92 py: Python,
93 data: Vec<Bound<'_, PyAny>>,
94) -> PyResult<Py<PyBytes>> {
95 if data.is_empty() {
96 return Err(to_pyvalue_err("Empty data"));
97 }
98
99 let data_type: String = data
100 .first()
101 .unwrap() .as_ref()
103 .getattr("__class__")?
104 .getattr("__name__")?
105 .extract()?;
106
107 match data_type.as_str() {
108 stringify!(OrderBookDelta) => {
109 let deltas = pyobjects_to_book_deltas(data)?;
110 py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
111 }
112 stringify!(QuoteTick) => {
113 let quotes = pyobjects_to_quotes(data)?;
114 py_quotes_to_arrow_record_batch_bytes(py, quotes)
115 }
116 stringify!(TradeTick) => {
117 let trades = pyobjects_to_trades(data)?;
118 py_trades_to_arrow_record_batch_bytes(py, trades)
119 }
120 stringify!(Bar) => {
121 let bars = pyobjects_to_bars(data)?;
122 py_bars_to_arrow_record_batch_bytes(py, bars)
123 }
124 stringify!(MarkPriceUpdate) => {
125 let updates = pyobjects_to_mark_prices(data)?;
126 py_mark_prices_to_arrow_record_batch_bytes(py, updates)
127 }
128 stringify!(IndexPriceUpdate) => {
129 let index_prices = pyobjects_to_index_prices(data)?;
130 py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
131 }
132 stringify!(InstrumentClose) => {
133 let closes = pyobjects_to_index_prices(data)?;
134 py_index_prices_to_arrow_record_batch_bytes(py, closes)
135 }
136 _ => Err(PyValueError::new_err(format!(
137 "unsupported data type: {data_type}"
138 ))),
139 }
140}
141
142#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
143pub fn py_book_deltas_to_arrow_record_batch_bytes(
144 py: Python,
145 data: Vec<OrderBookDelta>,
146) -> PyResult<Py<PyBytes>> {
147 match book_deltas_to_arrow_record_batch_bytes(data) {
148 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
149 Err(e) => Err(to_pyvalue_err(e)),
150 }
151}
152
153#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
154pub fn py_book_depth10_to_arrow_record_batch_bytes(
155 py: Python,
156 data: Vec<OrderBookDepth10>,
157) -> PyResult<Py<PyBytes>> {
158 match book_depth10_to_arrow_record_batch_bytes(data) {
159 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
160 Err(e) => Err(to_pyvalue_err(e)),
161 }
162}
163
164#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
165pub fn py_quotes_to_arrow_record_batch_bytes(
166 py: Python,
167 data: Vec<QuoteTick>,
168) -> PyResult<Py<PyBytes>> {
169 match quotes_to_arrow_record_batch_bytes(data) {
170 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
171 Err(e) => Err(to_pyvalue_err(e)),
172 }
173}
174
175#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
176pub fn py_trades_to_arrow_record_batch_bytes(
177 py: Python,
178 data: Vec<TradeTick>,
179) -> PyResult<Py<PyBytes>> {
180 match trades_to_arrow_record_batch_bytes(data) {
181 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
182 Err(e) => Err(to_pyvalue_err(e)),
183 }
184}
185
186#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
187pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
188 match bars_to_arrow_record_batch_bytes(data) {
189 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
190 Err(e) => Err(to_pyvalue_err(e)),
191 }
192}
193
194#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
195pub fn py_mark_prices_to_arrow_record_batch_bytes(
196 py: Python,
197 data: Vec<MarkPriceUpdate>,
198) -> PyResult<Py<PyBytes>> {
199 match mark_prices_to_arrow_record_batch_bytes(data) {
200 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
201 Err(e) => Err(to_pyvalue_err(e)),
202 }
203}
204
205#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
206pub fn py_index_prices_to_arrow_record_batch_bytes(
207 py: Python,
208 data: Vec<IndexPriceUpdate>,
209) -> PyResult<Py<PyBytes>> {
210 match index_prices_to_arrow_record_batch_bytes(data) {
211 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
212 Err(e) => Err(to_pyvalue_err(e)),
213 }
214}
215
216#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
217pub fn py_instrument_closes_to_arrow_record_batch_bytes(
218 py: Python,
219 data: Vec<InstrumentClose>,
220) -> PyResult<Py<PyBytes>> {
221 match instrument_closes_to_arrow_record_batch_bytes(data) {
222 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
223 Err(e) => Err(to_pyvalue_err(e)),
224 }
225}