nautilus_serialization/python/
arrow.rs1use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{
21 data::{Bar, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22 python::data::{
23 pyobjects_to_bars, pyobjects_to_order_book_deltas, pyobjects_to_quote_ticks,
24 pyobjects_to_trade_ticks,
25 },
26};
27use pyo3::{
28 conversion::IntoPyObjectExt,
29 exceptions::{PyRuntimeError, PyTypeError, PyValueError},
30 prelude::*,
31 types::{PyBytes, PyType},
32};
33
34use crate::arrow::{
35 ArrowSchemaProvider, bars_to_arrow_record_batch_bytes,
36 order_book_deltas_to_arrow_record_batch_bytes, order_book_depth10_to_arrow_record_batch_bytes,
37 quote_ticks_to_arrow_record_batch_bytes, trade_ticks_to_arrow_record_batch_bytes,
38};
39
40fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
42 let mut cursor = Cursor::new(Vec::new());
44 {
45 let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
46 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
47
48 writer
49 .write(&batch)
50 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
51
52 writer
53 .finish()
54 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
55 }
56
57 let buffer = cursor.into_inner();
58 let pybytes = PyBytes::new(py, &buffer);
59
60 Ok(pybytes.into())
61}
62
63#[pyfunction]
64pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
65 let cls_str: String = cls.getattr("__name__")?.extract()?;
66 let result_map = match cls_str.as_str() {
67 stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
68 stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
69 stringify!(QuoteTick) => QuoteTick::get_schema_map(),
70 stringify!(TradeTick) => TradeTick::get_schema_map(),
71 stringify!(Bar) => Bar::get_schema_map(),
72 _ => {
73 return Err(PyTypeError::new_err(format!(
74 "Arrow schema for `{cls_str}` is not currently implemented in Rust."
75 )));
76 }
77 };
78
79 result_map.into_py_any(py)
80}
81
82#[pyfunction]
85pub fn pyobjects_to_arrow_record_batch_bytes(
86 py: Python,
87 data: Vec<Bound<'_, PyAny>>,
88) -> PyResult<Py<PyBytes>> {
89 if data.is_empty() {
90 return Err(to_pyvalue_err("Empty data"));
91 }
92
93 let data_type: String = data
94 .first()
95 .unwrap() .as_ref()
97 .getattr("__class__")?
98 .getattr("__name__")?
99 .extract()?;
100
101 match data_type.as_str() {
102 stringify!(OrderBookDelta) => {
103 let deltas = pyobjects_to_order_book_deltas(data)?;
104 py_order_book_deltas_to_arrow_record_batch_bytes(py, deltas)
105 }
106 stringify!(QuoteTick) => {
107 let quotes = pyobjects_to_quote_ticks(data)?;
108 py_quote_ticks_to_arrow_record_batch_bytes(py, quotes)
109 }
110 stringify!(TradeTick) => {
111 let trades = pyobjects_to_trade_ticks(data)?;
112 py_trade_ticks_to_arrow_record_batch_bytes(py, trades)
113 }
114 stringify!(Bar) => {
115 let bars = pyobjects_to_bars(data)?;
116 py_bars_to_arrow_record_batch_bytes(py, bars)
117 }
118 _ => Err(PyValueError::new_err(format!(
119 "unsupported data type: {data_type}"
120 ))),
121 }
122}
123
124#[pyfunction(name = "order_book_deltas_to_arrow_record_batch_bytes")]
125pub fn py_order_book_deltas_to_arrow_record_batch_bytes(
126 py: Python,
127 data: Vec<OrderBookDelta>,
128) -> PyResult<Py<PyBytes>> {
129 match order_book_deltas_to_arrow_record_batch_bytes(data) {
130 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
131 Err(e) => Err(to_pyvalue_err(e)),
132 }
133}
134
135#[pyfunction(name = "order_book_depth10_to_arrow_record_batch_bytes")]
136pub fn py_order_book_depth10_to_arrow_record_batch_bytes(
137 py: Python,
138 data: Vec<OrderBookDepth10>,
139) -> PyResult<Py<PyBytes>> {
140 match order_book_depth10_to_arrow_record_batch_bytes(data) {
141 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
142 Err(e) => Err(to_pyvalue_err(e)),
143 }
144}
145
146#[pyfunction(name = "quote_ticks_to_arrow_record_batch_bytes")]
147pub fn py_quote_ticks_to_arrow_record_batch_bytes(
148 py: Python,
149 data: Vec<QuoteTick>,
150) -> PyResult<Py<PyBytes>> {
151 match quote_ticks_to_arrow_record_batch_bytes(data) {
152 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
153 Err(e) => Err(to_pyvalue_err(e)),
154 }
155}
156
157#[pyfunction(name = "trade_ticks_to_arrow_record_batch_bytes")]
158pub fn py_trade_ticks_to_arrow_record_batch_bytes(
159 py: Python,
160 data: Vec<TradeTick>,
161) -> PyResult<Py<PyBytes>> {
162 match trade_ticks_to_arrow_record_batch_bytes(data) {
163 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
164 Err(e) => Err(to_pyvalue_err(e)),
165 }
166}
167
168#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
169pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
170 match bars_to_arrow_record_batch_bytes(data) {
171 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
172 Err(e) => Err(to_pyvalue_err(e)),
173 }
174}