nautilus_serialization/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// TODO: We still rely on `IntoPy` for now, so temporarily ignore
17// these deprecations until fully migrated to `IntoPyObject`.
18#![allow(deprecated)]
19
20use std::io::Cursor;
21
22use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
23use nautilus_core::python::to_pyvalue_err;
24use nautilus_model::{
25    data::{Bar, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
26    python::data::{
27        pyobjects_to_bars, pyobjects_to_order_book_deltas, pyobjects_to_quote_ticks,
28        pyobjects_to_trade_ticks,
29    },
30};
31use pyo3::{
32    exceptions::{PyRuntimeError, PyTypeError, PyValueError},
33    prelude::*,
34    types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38    bars_to_arrow_record_batch_bytes, order_book_deltas_to_arrow_record_batch_bytes,
39    order_book_depth10_to_arrow_record_batch_bytes, quote_ticks_to_arrow_record_batch_bytes,
40    trade_ticks_to_arrow_record_batch_bytes, ArrowSchemaProvider,
41};
42
43/// Transforms the given record `batches` into Python `bytes`.
44fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
45    // Create a cursor to write to a byte array in memory
46    let mut cursor = Cursor::new(Vec::new());
47    {
48        let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
49            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
50
51        writer
52            .write(&batch)
53            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
54
55        writer
56            .finish()
57            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
58    }
59
60    let buffer = cursor.into_inner();
61    let pybytes = PyBytes::new(py, &buffer);
62
63    Ok(pybytes.into())
64}
65
66#[pyfunction]
67pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
68    let cls_str: String = cls.getattr("__name__")?.extract()?;
69    let result_map = match cls_str.as_str() {
70        stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
71        stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
72        stringify!(QuoteTick) => QuoteTick::get_schema_map(),
73        stringify!(TradeTick) => TradeTick::get_schema_map(),
74        stringify!(Bar) => Bar::get_schema_map(),
75        _ => {
76            return Err(PyTypeError::new_err(format!(
77                "Arrow schema for `{cls_str}` is not currently implemented in Rust."
78            )));
79        }
80    };
81
82    Ok(result_map.into_py(py))
83}
84
85/// Return Python `bytes` from the given list of 'legacy' data objects, which can be passed
86/// to `pa.ipc.open_stream` to create a `RecordBatchReader`.
87#[pyfunction]
88pub fn pyobjects_to_arrow_record_batch_bytes(
89    py: Python,
90    data: Vec<Bound<'_, PyAny>>,
91) -> PyResult<Py<PyBytes>> {
92    if data.is_empty() {
93        return Err(to_pyvalue_err("Empty data"));
94    }
95
96    let data_type: String = data
97        .first()
98        .unwrap() // SAFETY: Unwrap safe as already checked that `data` not empty
99        .as_ref()
100        .getattr("__class__")?
101        .getattr("__name__")?
102        .extract()?;
103
104    match data_type.as_str() {
105        stringify!(OrderBookDelta) => {
106            let deltas = pyobjects_to_order_book_deltas(data)?;
107            py_order_book_deltas_to_arrow_record_batch_bytes(py, deltas)
108        }
109        stringify!(QuoteTick) => {
110            let quotes = pyobjects_to_quote_ticks(data)?;
111            py_quote_ticks_to_arrow_record_batch_bytes(py, quotes)
112        }
113        stringify!(TradeTick) => {
114            let trades = pyobjects_to_trade_ticks(data)?;
115            py_trade_ticks_to_arrow_record_batch_bytes(py, trades)
116        }
117        stringify!(Bar) => {
118            let bars = pyobjects_to_bars(data)?;
119            py_bars_to_arrow_record_batch_bytes(py, bars)
120        }
121        _ => Err(PyValueError::new_err(format!(
122            "unsupported data type: {data_type}"
123        ))),
124    }
125}
126
127#[pyfunction(name = "order_book_deltas_to_arrow_record_batch_bytes")]
128pub fn py_order_book_deltas_to_arrow_record_batch_bytes(
129    py: Python,
130    data: Vec<OrderBookDelta>,
131) -> PyResult<Py<PyBytes>> {
132    match order_book_deltas_to_arrow_record_batch_bytes(data) {
133        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
134        Err(e) => Err(to_pyvalue_err(e)),
135    }
136}
137
138#[pyfunction(name = "order_book_depth10_to_arrow_record_batch_bytes")]
139pub fn py_order_book_depth10_to_arrow_record_batch_bytes(
140    py: Python,
141    data: Vec<OrderBookDepth10>,
142) -> PyResult<Py<PyBytes>> {
143    match order_book_depth10_to_arrow_record_batch_bytes(data) {
144        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
145        Err(e) => Err(to_pyvalue_err(e)),
146    }
147}
148
149#[pyfunction(name = "quote_ticks_to_arrow_record_batch_bytes")]
150pub fn py_quote_ticks_to_arrow_record_batch_bytes(
151    py: Python,
152    data: Vec<QuoteTick>,
153) -> PyResult<Py<PyBytes>> {
154    match quote_ticks_to_arrow_record_batch_bytes(data) {
155        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
156        Err(e) => Err(to_pyvalue_err(e)),
157    }
158}
159
160#[pyfunction(name = "trade_ticks_to_arrow_record_batch_bytes")]
161pub fn py_trade_ticks_to_arrow_record_batch_bytes(
162    py: Python,
163    data: Vec<TradeTick>,
164) -> PyResult<Py<PyBytes>> {
165    match trade_ticks_to_arrow_record_batch_bytes(data) {
166        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
167        Err(e) => Err(to_pyvalue_err(e)),
168    }
169}
170
171#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
172pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
173    match bars_to_arrow_record_batch_bytes(data) {
174        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
175        Err(e) => Err(to_pyvalue_err(e)),
176    }
177}