nautilus_serialization/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{
21    data::{Bar, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22    python::data::{
23        pyobjects_to_bars, pyobjects_to_order_book_deltas, pyobjects_to_quote_ticks,
24        pyobjects_to_trade_ticks,
25    },
26};
27use pyo3::{
28    conversion::IntoPyObjectExt,
29    exceptions::{PyRuntimeError, PyTypeError, PyValueError},
30    prelude::*,
31    types::{PyBytes, PyType},
32};
33
34use crate::arrow::{
35    ArrowSchemaProvider, bars_to_arrow_record_batch_bytes,
36    order_book_deltas_to_arrow_record_batch_bytes, order_book_depth10_to_arrow_record_batch_bytes,
37    quote_ticks_to_arrow_record_batch_bytes, trade_ticks_to_arrow_record_batch_bytes,
38};
39
40/// Transforms the given record `batches` into Python `bytes`.
41fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
42    // Create a cursor to write to a byte array in memory
43    let mut cursor = Cursor::new(Vec::new());
44    {
45        let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
46            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
47
48        writer
49            .write(&batch)
50            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
51
52        writer
53            .finish()
54            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
55    }
56
57    let buffer = cursor.into_inner();
58    let pybytes = PyBytes::new(py, &buffer);
59
60    Ok(pybytes.into())
61}
62
63#[pyfunction]
64pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
65    let cls_str: String = cls.getattr("__name__")?.extract()?;
66    let result_map = match cls_str.as_str() {
67        stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
68        stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
69        stringify!(QuoteTick) => QuoteTick::get_schema_map(),
70        stringify!(TradeTick) => TradeTick::get_schema_map(),
71        stringify!(Bar) => Bar::get_schema_map(),
72        _ => {
73            return Err(PyTypeError::new_err(format!(
74                "Arrow schema for `{cls_str}` is not currently implemented in Rust."
75            )));
76        }
77    };
78
79    result_map.into_py_any(py)
80}
81
82/// Return Python `bytes` from the given list of 'legacy' data objects, which can be passed
83/// to `pa.ipc.open_stream` to create a `RecordBatchReader`.
84#[pyfunction]
85pub fn pyobjects_to_arrow_record_batch_bytes(
86    py: Python,
87    data: Vec<Bound<'_, PyAny>>,
88) -> PyResult<Py<PyBytes>> {
89    if data.is_empty() {
90        return Err(to_pyvalue_err("Empty data"));
91    }
92
93    let data_type: String = data
94        .first()
95        .unwrap() // SAFETY: Unwrap safe as already checked that `data` not empty
96        .as_ref()
97        .getattr("__class__")?
98        .getattr("__name__")?
99        .extract()?;
100
101    match data_type.as_str() {
102        stringify!(OrderBookDelta) => {
103            let deltas = pyobjects_to_order_book_deltas(data)?;
104            py_order_book_deltas_to_arrow_record_batch_bytes(py, deltas)
105        }
106        stringify!(QuoteTick) => {
107            let quotes = pyobjects_to_quote_ticks(data)?;
108            py_quote_ticks_to_arrow_record_batch_bytes(py, quotes)
109        }
110        stringify!(TradeTick) => {
111            let trades = pyobjects_to_trade_ticks(data)?;
112            py_trade_ticks_to_arrow_record_batch_bytes(py, trades)
113        }
114        stringify!(Bar) => {
115            let bars = pyobjects_to_bars(data)?;
116            py_bars_to_arrow_record_batch_bytes(py, bars)
117        }
118        _ => Err(PyValueError::new_err(format!(
119            "unsupported data type: {data_type}"
120        ))),
121    }
122}
123
124#[pyfunction(name = "order_book_deltas_to_arrow_record_batch_bytes")]
125pub fn py_order_book_deltas_to_arrow_record_batch_bytes(
126    py: Python,
127    data: Vec<OrderBookDelta>,
128) -> PyResult<Py<PyBytes>> {
129    match order_book_deltas_to_arrow_record_batch_bytes(data) {
130        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
131        Err(e) => Err(to_pyvalue_err(e)),
132    }
133}
134
135#[pyfunction(name = "order_book_depth10_to_arrow_record_batch_bytes")]
136pub fn py_order_book_depth10_to_arrow_record_batch_bytes(
137    py: Python,
138    data: Vec<OrderBookDepth10>,
139) -> PyResult<Py<PyBytes>> {
140    match order_book_depth10_to_arrow_record_batch_bytes(data) {
141        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
142        Err(e) => Err(to_pyvalue_err(e)),
143    }
144}
145
146#[pyfunction(name = "quote_ticks_to_arrow_record_batch_bytes")]
147pub fn py_quote_ticks_to_arrow_record_batch_bytes(
148    py: Python,
149    data: Vec<QuoteTick>,
150) -> PyResult<Py<PyBytes>> {
151    match quote_ticks_to_arrow_record_batch_bytes(data) {
152        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
153        Err(e) => Err(to_pyvalue_err(e)),
154    }
155}
156
157#[pyfunction(name = "trade_ticks_to_arrow_record_batch_bytes")]
158pub fn py_trade_ticks_to_arrow_record_batch_bytes(
159    py: Python,
160    data: Vec<TradeTick>,
161) -> PyResult<Py<PyBytes>> {
162    match trade_ticks_to_arrow_record_batch_bytes(data) {
163        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
164        Err(e) => Err(to_pyvalue_err(e)),
165    }
166}
167
168#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
169pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
170    match bars_to_arrow_record_batch_bytes(data) {
171        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
172        Err(e) => Err(to_pyvalue_err(e)),
173    }
174}