nautilus_serialization/python/
arrow.rs1#![allow(deprecated)]
19
20use std::io::Cursor;
21
22use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
23use nautilus_core::python::to_pyvalue_err;
24use nautilus_model::{
25 data::{Bar, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
26 python::data::{
27 pyobjects_to_bars, pyobjects_to_order_book_deltas, pyobjects_to_quote_ticks,
28 pyobjects_to_trade_ticks,
29 },
30};
31use pyo3::{
32 exceptions::{PyRuntimeError, PyTypeError, PyValueError},
33 prelude::*,
34 types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38 bars_to_arrow_record_batch_bytes, order_book_deltas_to_arrow_record_batch_bytes,
39 order_book_depth10_to_arrow_record_batch_bytes, quote_ticks_to_arrow_record_batch_bytes,
40 trade_ticks_to_arrow_record_batch_bytes, ArrowSchemaProvider,
41};
42
43fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
45 let mut cursor = Cursor::new(Vec::new());
47 {
48 let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
49 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
50
51 writer
52 .write(&batch)
53 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
54
55 writer
56 .finish()
57 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
58 }
59
60 let buffer = cursor.into_inner();
61 let pybytes = PyBytes::new(py, &buffer);
62
63 Ok(pybytes.into())
64}
65
66#[pyfunction]
67pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
68 let cls_str: String = cls.getattr("__name__")?.extract()?;
69 let result_map = match cls_str.as_str() {
70 stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
71 stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
72 stringify!(QuoteTick) => QuoteTick::get_schema_map(),
73 stringify!(TradeTick) => TradeTick::get_schema_map(),
74 stringify!(Bar) => Bar::get_schema_map(),
75 _ => {
76 return Err(PyTypeError::new_err(format!(
77 "Arrow schema for `{cls_str}` is not currently implemented in Rust."
78 )));
79 }
80 };
81
82 Ok(result_map.into_py(py))
83}
84
85#[pyfunction]
88pub fn pyobjects_to_arrow_record_batch_bytes(
89 py: Python,
90 data: Vec<Bound<'_, PyAny>>,
91) -> PyResult<Py<PyBytes>> {
92 if data.is_empty() {
93 return Err(to_pyvalue_err("Empty data"));
94 }
95
96 let data_type: String = data
97 .first()
98 .unwrap() .as_ref()
100 .getattr("__class__")?
101 .getattr("__name__")?
102 .extract()?;
103
104 match data_type.as_str() {
105 stringify!(OrderBookDelta) => {
106 let deltas = pyobjects_to_order_book_deltas(data)?;
107 py_order_book_deltas_to_arrow_record_batch_bytes(py, deltas)
108 }
109 stringify!(QuoteTick) => {
110 let quotes = pyobjects_to_quote_ticks(data)?;
111 py_quote_ticks_to_arrow_record_batch_bytes(py, quotes)
112 }
113 stringify!(TradeTick) => {
114 let trades = pyobjects_to_trade_ticks(data)?;
115 py_trade_ticks_to_arrow_record_batch_bytes(py, trades)
116 }
117 stringify!(Bar) => {
118 let bars = pyobjects_to_bars(data)?;
119 py_bars_to_arrow_record_batch_bytes(py, bars)
120 }
121 _ => Err(PyValueError::new_err(format!(
122 "unsupported data type: {data_type}"
123 ))),
124 }
125}
126
127#[pyfunction(name = "order_book_deltas_to_arrow_record_batch_bytes")]
128pub fn py_order_book_deltas_to_arrow_record_batch_bytes(
129 py: Python,
130 data: Vec<OrderBookDelta>,
131) -> PyResult<Py<PyBytes>> {
132 match order_book_deltas_to_arrow_record_batch_bytes(data) {
133 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
134 Err(e) => Err(to_pyvalue_err(e)),
135 }
136}
137
138#[pyfunction(name = "order_book_depth10_to_arrow_record_batch_bytes")]
139pub fn py_order_book_depth10_to_arrow_record_batch_bytes(
140 py: Python,
141 data: Vec<OrderBookDepth10>,
142) -> PyResult<Py<PyBytes>> {
143 match order_book_depth10_to_arrow_record_batch_bytes(data) {
144 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
145 Err(e) => Err(to_pyvalue_err(e)),
146 }
147}
148
149#[pyfunction(name = "quote_ticks_to_arrow_record_batch_bytes")]
150pub fn py_quote_ticks_to_arrow_record_batch_bytes(
151 py: Python,
152 data: Vec<QuoteTick>,
153) -> PyResult<Py<PyBytes>> {
154 match quote_ticks_to_arrow_record_batch_bytes(data) {
155 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
156 Err(e) => Err(to_pyvalue_err(e)),
157 }
158}
159
160#[pyfunction(name = "trade_ticks_to_arrow_record_batch_bytes")]
161pub fn py_trade_ticks_to_arrow_record_batch_bytes(
162 py: Python,
163 data: Vec<TradeTick>,
164) -> PyResult<Py<PyBytes>> {
165 match trade_ticks_to_arrow_record_batch_bytes(data) {
166 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
167 Err(e) => Err(to_pyvalue_err(e)),
168 }
169}
170
171#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
172pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
173 match bars_to_arrow_record_batch_bytes(data) {
174 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
175 Err(e) => Err(to_pyvalue_err(e)),
176 }
177}