nautilus_serialization/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{
21    data::{
22        Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick, close::InstrumentClose,
24    },
25    python::data::{
26        pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27        pyobjects_to_mark_prices, pyobjects_to_quotes, pyobjects_to_trades,
28    },
29};
30use pyo3::{
31    conversion::IntoPyObjectExt,
32    exceptions::{PyRuntimeError, PyTypeError, PyValueError},
33    prelude::*,
34    types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38    ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39    book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
40    instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
41    quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
42};
43
44/// Transforms the given record `batches` into Python `bytes`.
45fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
46    // Create a cursor to write to a byte array in memory
47    let mut cursor = Cursor::new(Vec::new());
48    {
49        let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
50            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
51
52        writer
53            .write(&batch)
54            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
55
56        writer
57            .finish()
58            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
59    }
60
61    let buffer = cursor.into_inner();
62    let pybytes = PyBytes::new(py, &buffer);
63
64    Ok(pybytes.into())
65}
66
67#[pyfunction]
68pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
69    let cls_str: String = cls.getattr("__name__")?.extract()?;
70    let result_map = match cls_str.as_str() {
71        stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
72        stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
73        stringify!(QuoteTick) => QuoteTick::get_schema_map(),
74        stringify!(TradeTick) => TradeTick::get_schema_map(),
75        stringify!(Bar) => Bar::get_schema_map(),
76        stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
77        stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
78        _ => {
79            return Err(PyTypeError::new_err(format!(
80                "Arrow schema for `{cls_str}` is not currently implemented in Rust."
81            )));
82        }
83    };
84
85    result_map.into_py_any(py)
86}
87
88/// Return Python `bytes` from the given list of 'legacy' data objects, which can be passed
89/// to `pa.ipc.open_stream` to create a `RecordBatchReader`.
90#[pyfunction]
91pub fn pyobjects_to_arrow_record_batch_bytes(
92    py: Python,
93    data: Vec<Bound<'_, PyAny>>,
94) -> PyResult<Py<PyBytes>> {
95    if data.is_empty() {
96        return Err(to_pyvalue_err("Empty data"));
97    }
98
99    let data_type: String = data
100        .first()
101        .unwrap() // SAFETY: Unwrap safe as already checked that `data` not empty
102        .as_ref()
103        .getattr("__class__")?
104        .getattr("__name__")?
105        .extract()?;
106
107    match data_type.as_str() {
108        stringify!(OrderBookDelta) => {
109            let deltas = pyobjects_to_book_deltas(data)?;
110            py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
111        }
112        stringify!(QuoteTick) => {
113            let quotes = pyobjects_to_quotes(data)?;
114            py_quotes_to_arrow_record_batch_bytes(py, quotes)
115        }
116        stringify!(TradeTick) => {
117            let trades = pyobjects_to_trades(data)?;
118            py_trades_to_arrow_record_batch_bytes(py, trades)
119        }
120        stringify!(Bar) => {
121            let bars = pyobjects_to_bars(data)?;
122            py_bars_to_arrow_record_batch_bytes(py, bars)
123        }
124        stringify!(MarkPriceUpdate) => {
125            let updates = pyobjects_to_mark_prices(data)?;
126            py_mark_prices_to_arrow_record_batch_bytes(py, updates)
127        }
128        stringify!(IndexPriceUpdate) => {
129            let index_prices = pyobjects_to_index_prices(data)?;
130            py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
131        }
132        stringify!(InstrumentClose) => {
133            let closes = pyobjects_to_index_prices(data)?;
134            py_index_prices_to_arrow_record_batch_bytes(py, closes)
135        }
136        _ => Err(PyValueError::new_err(format!(
137            "unsupported data type: {data_type}"
138        ))),
139    }
140}
141
142#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
143pub fn py_book_deltas_to_arrow_record_batch_bytes(
144    py: Python,
145    data: Vec<OrderBookDelta>,
146) -> PyResult<Py<PyBytes>> {
147    match book_deltas_to_arrow_record_batch_bytes(data) {
148        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
149        Err(e) => Err(to_pyvalue_err(e)),
150    }
151}
152
153#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
154pub fn py_book_depth10_to_arrow_record_batch_bytes(
155    py: Python,
156    data: Vec<OrderBookDepth10>,
157) -> PyResult<Py<PyBytes>> {
158    match book_depth10_to_arrow_record_batch_bytes(data) {
159        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
160        Err(e) => Err(to_pyvalue_err(e)),
161    }
162}
163
164#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
165pub fn py_quotes_to_arrow_record_batch_bytes(
166    py: Python,
167    data: Vec<QuoteTick>,
168) -> PyResult<Py<PyBytes>> {
169    match quotes_to_arrow_record_batch_bytes(data) {
170        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
171        Err(e) => Err(to_pyvalue_err(e)),
172    }
173}
174
175#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
176pub fn py_trades_to_arrow_record_batch_bytes(
177    py: Python,
178    data: Vec<TradeTick>,
179) -> PyResult<Py<PyBytes>> {
180    match trades_to_arrow_record_batch_bytes(data) {
181        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
182        Err(e) => Err(to_pyvalue_err(e)),
183    }
184}
185
186#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
187pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
188    match bars_to_arrow_record_batch_bytes(data) {
189        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
190        Err(e) => Err(to_pyvalue_err(e)),
191    }
192}
193
194#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
195pub fn py_mark_prices_to_arrow_record_batch_bytes(
196    py: Python,
197    data: Vec<MarkPriceUpdate>,
198) -> PyResult<Py<PyBytes>> {
199    match mark_prices_to_arrow_record_batch_bytes(data) {
200        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
201        Err(e) => Err(to_pyvalue_err(e)),
202    }
203}
204
205#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
206pub fn py_index_prices_to_arrow_record_batch_bytes(
207    py: Python,
208    data: Vec<IndexPriceUpdate>,
209) -> PyResult<Py<PyBytes>> {
210    match index_prices_to_arrow_record_batch_bytes(data) {
211        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
212        Err(e) => Err(to_pyvalue_err(e)),
213    }
214}
215
216#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
217pub fn py_instrument_closes_to_arrow_record_batch_bytes(
218    py: Python,
219    data: Vec<InstrumentClose>,
220) -> PyResult<Py<PyBytes>> {
221    match instrument_closes_to_arrow_record_batch_bytes(data) {
222        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
223        Err(e) => Err(to_pyvalue_err(e)),
224    }
225}