nautilus_serialization/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{
21    data::{
22        Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick, close::InstrumentClose,
24    },
25    python::data::{
26        pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27        pyobjects_to_instrument_closes, pyobjects_to_mark_prices, pyobjects_to_quotes,
28        pyobjects_to_trades,
29    },
30};
31use pyo3::{
32    conversion::IntoPyObjectExt,
33    exceptions::{PyRuntimeError, PyTypeError, PyValueError},
34    prelude::*,
35    types::{PyBytes, PyType},
36};
37
38use crate::arrow::{
39    ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
40    book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
41    instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
42    quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
43};
44
45/// Transforms the given record `batches` into Python `bytes`.
46fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
47    // Create a cursor to write to a byte array in memory
48    let mut cursor = Cursor::new(Vec::new());
49    {
50        let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
51            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
52
53        writer
54            .write(&batch)
55            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
56
57        writer
58            .finish()
59            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
60    }
61
62    let buffer = cursor.into_inner();
63    let pybytes = PyBytes::new(py, &buffer);
64
65    Ok(pybytes.into())
66}
67
68/// Returns a mapping from field names to Arrow data types for the given Rust data class.
69///
70/// # Errors
71///
72/// Returns a `PyErr` if the class name is not recognized or schema extraction fails.
73#[pyfunction]
74pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
75    let cls_str: String = cls.getattr("__name__")?.extract()?;
76    let result_map = match cls_str.as_str() {
77        stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
78        stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
79        stringify!(QuoteTick) => QuoteTick::get_schema_map(),
80        stringify!(TradeTick) => TradeTick::get_schema_map(),
81        stringify!(Bar) => Bar::get_schema_map(),
82        stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
83        stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
84        stringify!(InstrumentClose) => InstrumentClose::get_schema_map(),
85        _ => {
86            return Err(PyTypeError::new_err(format!(
87                "Arrow schema for `{cls_str}` is not currently implemented in Rust."
88            )));
89        }
90    };
91
92    result_map.into_py_any(py)
93}
94
95/// Returns Python `bytes` from the given list of legacy data objects, which can be passed
96/// to `pa.ipc.open_stream` to create a `RecordBatchReader`.
97///
98/// # Errors
99///
100/// Returns an error if:
101/// - The input list is empty: `PyErr`.
102/// - An unsupported data type is encountered or conversion fails: `PyErr`.
103///
104/// # Panics
105///
106/// Panics if `data.first()` returns `None` (should not occur due to emptiness check).
107#[pyfunction]
108pub fn pyobjects_to_arrow_record_batch_bytes(
109    py: Python,
110    data: Vec<Bound<'_, PyAny>>,
111) -> PyResult<Py<PyBytes>> {
112    if data.is_empty() {
113        return Err(to_pyvalue_err("Empty data"));
114    }
115
116    let data_type: String = data
117        .first()
118        .unwrap() // SAFETY: Unwrap safe as already checked that `data` not empty
119        .getattr("__class__")?
120        .getattr("__name__")?
121        .extract()?;
122
123    match data_type.as_str() {
124        stringify!(OrderBookDelta) => {
125            let deltas = pyobjects_to_book_deltas(data)?;
126            py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
127        }
128        stringify!(OrderBookDepth10) => {
129            let depth_snapshots: Vec<OrderBookDepth10> = data
130                .into_iter()
131                .map(|obj| obj.extract::<OrderBookDepth10>())
132                .collect::<PyResult<Vec<OrderBookDepth10>>>()?;
133            py_book_depth10_to_arrow_record_batch_bytes(py, depth_snapshots)
134        }
135        stringify!(QuoteTick) => {
136            let quotes = pyobjects_to_quotes(data)?;
137            py_quotes_to_arrow_record_batch_bytes(py, quotes)
138        }
139        stringify!(TradeTick) => {
140            let trades = pyobjects_to_trades(data)?;
141            py_trades_to_arrow_record_batch_bytes(py, trades)
142        }
143        stringify!(Bar) => {
144            let bars = pyobjects_to_bars(data)?;
145            py_bars_to_arrow_record_batch_bytes(py, bars)
146        }
147        stringify!(MarkPriceUpdate) => {
148            let updates = pyobjects_to_mark_prices(data)?;
149            py_mark_prices_to_arrow_record_batch_bytes(py, updates)
150        }
151        stringify!(IndexPriceUpdate) => {
152            let index_prices = pyobjects_to_index_prices(data)?;
153            py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
154        }
155        stringify!(InstrumentClose) => {
156            let closes = pyobjects_to_instrument_closes(data)?;
157            py_instrument_closes_to_arrow_record_batch_bytes(py, closes)
158        }
159        _ => Err(PyValueError::new_err(format!(
160            "unsupported data type: {data_type}"
161        ))),
162    }
163}
164
165/// Converts a list of `OrderBookDelta` into Arrow IPC bytes for Python.
166///
167/// # Errors
168///
169/// Returns a `PyErr` if encoding fails.
170#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
171pub fn py_book_deltas_to_arrow_record_batch_bytes(
172    py: Python,
173    data: Vec<OrderBookDelta>,
174) -> PyResult<Py<PyBytes>> {
175    match book_deltas_to_arrow_record_batch_bytes(data) {
176        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
177        Err(e) => Err(to_pyvalue_err(e)),
178    }
179}
180
181/// Converts a list of `OrderBookDepth10` into Arrow IPC bytes for Python.
182///
183/// # Errors
184///
185/// Returns a `PyErr` if encoding fails.
186#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
187pub fn py_book_depth10_to_arrow_record_batch_bytes(
188    py: Python,
189    data: Vec<OrderBookDepth10>,
190) -> PyResult<Py<PyBytes>> {
191    match book_depth10_to_arrow_record_batch_bytes(data) {
192        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
193        Err(e) => Err(to_pyvalue_err(e)),
194    }
195}
196
197/// Converts a list of `QuoteTick` into Arrow IPC bytes for Python.
198///
199/// # Errors
200///
201/// Returns a `PyErr` if encoding fails.
202#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
203pub fn py_quotes_to_arrow_record_batch_bytes(
204    py: Python,
205    data: Vec<QuoteTick>,
206) -> PyResult<Py<PyBytes>> {
207    match quotes_to_arrow_record_batch_bytes(data) {
208        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
209        Err(e) => Err(to_pyvalue_err(e)),
210    }
211}
212
213/// Converts a list of `TradeTick` into Arrow IPC bytes for Python.
214///
215/// # Errors
216///
217/// Returns a `PyErr` if encoding fails.
218#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
219pub fn py_trades_to_arrow_record_batch_bytes(
220    py: Python,
221    data: Vec<TradeTick>,
222) -> PyResult<Py<PyBytes>> {
223    match trades_to_arrow_record_batch_bytes(data) {
224        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
225        Err(e) => Err(to_pyvalue_err(e)),
226    }
227}
228
229/// Converts a list of `Bar` into Arrow IPC bytes for Python.
230///
231/// # Errors
232///
233/// Returns a `PyErr` if encoding fails.
234#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
235pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
236    match bars_to_arrow_record_batch_bytes(data) {
237        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
238        Err(e) => Err(to_pyvalue_err(e)),
239    }
240}
241
242/// Converts a list of `MarkPriceUpdate` into Arrow IPC bytes for Python.
243///
244/// # Errors
245///
246/// Returns a `PyErr` if encoding fails.
247#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
248pub fn py_mark_prices_to_arrow_record_batch_bytes(
249    py: Python,
250    data: Vec<MarkPriceUpdate>,
251) -> PyResult<Py<PyBytes>> {
252    match mark_prices_to_arrow_record_batch_bytes(data) {
253        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
254        Err(e) => Err(to_pyvalue_err(e)),
255    }
256}
257
258/// Converts a list of `IndexPriceUpdate` into Arrow IPC bytes for Python.
259///
260/// # Errors
261///
262/// Returns a `PyErr` if encoding fails.
263#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
264pub fn py_index_prices_to_arrow_record_batch_bytes(
265    py: Python,
266    data: Vec<IndexPriceUpdate>,
267) -> PyResult<Py<PyBytes>> {
268    match index_prices_to_arrow_record_batch_bytes(data) {
269        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
270        Err(e) => Err(to_pyvalue_err(e)),
271    }
272}
273
274/// Converts a list of `InstrumentClose` into Arrow IPC bytes for Python.
275///
276/// # Errors
277///
278/// Returns a `PyErr` if encoding fails.
279#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
280pub fn py_instrument_closes_to_arrow_record_batch_bytes(
281    py: Python,
282    data: Vec<InstrumentClose>,
283) -> PyResult<Py<PyBytes>> {
284    match instrument_closes_to_arrow_record_batch_bytes(data) {
285        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
286        Err(e) => Err(to_pyvalue_err(e)),
287    }
288}