Skip to main content

nautilus_serialization/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::{to_pyruntime_err, to_pytype_err, to_pyvalue_err};
20use nautilus_model::{
21    data::{
22        Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick, close::InstrumentClose,
24    },
25    python::data::{
26        pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27        pyobjects_to_instrument_closes, pyobjects_to_mark_prices, pyobjects_to_quotes,
28        pyobjects_to_trades,
29    },
30};
31use pyo3::{
32    conversion::IntoPyObjectExt,
33    prelude::*,
34    types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38    ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39    book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
40    instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
41    quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
42};
43
44/// Transforms the given record `batches` into Python `bytes`.
45fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
46    // Create a cursor to write to a byte array in memory
47    let mut cursor = Cursor::new(Vec::new());
48    {
49        let mut writer =
50            StreamWriter::try_new(&mut cursor, &batch.schema()).map_err(to_pyruntime_err)?;
51
52        writer.write(&batch).map_err(to_pyruntime_err)?;
53
54        writer.finish().map_err(to_pyruntime_err)?;
55    }
56
57    let buffer = cursor.into_inner();
58    let pybytes = PyBytes::new(py, &buffer);
59
60    Ok(pybytes.into())
61}
62
63/// Returns a mapping from field names to Arrow data types for the given Rust data class.
64///
65/// # Errors
66///
67/// Returns a `PyErr` if the class name is not recognized or schema extraction fails.
68#[pyfunction]
69pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
70    let cls_str: String = cls.getattr("__name__")?.extract()?;
71    let result_map = match cls_str.as_str() {
72        stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
73        stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
74        stringify!(QuoteTick) => QuoteTick::get_schema_map(),
75        stringify!(TradeTick) => TradeTick::get_schema_map(),
76        stringify!(Bar) => Bar::get_schema_map(),
77        stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
78        stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
79        stringify!(InstrumentClose) => InstrumentClose::get_schema_map(),
80        _ => {
81            return Err(to_pytype_err(format!(
82                "Arrow schema for `{cls_str}` is not currently implemented in Rust."
83            )));
84        }
85    };
86
87    result_map.into_py_any(py)
88}
89
90/// Returns Python `bytes` from the given list of legacy data objects, which can be passed
91/// to `pa.ipc.open_stream` to create a `RecordBatchReader`.
92///
93/// # Errors
94///
95/// Returns an error if:
96/// - The input list is empty: `PyErr`.
97/// - An unsupported data type is encountered or conversion fails: `PyErr`.
98///
99/// # Panics
100///
101/// Panics if `data.first()` returns `None` (should not occur due to emptiness check).
102#[pyfunction]
103pub fn pyobjects_to_arrow_record_batch_bytes(
104    py: Python,
105    data: Vec<Bound<'_, PyAny>>,
106) -> PyResult<Py<PyBytes>> {
107    if data.is_empty() {
108        return Err(to_pyvalue_err("Empty data"));
109    }
110
111    let data_type: String = data
112        .first()
113        .unwrap() // SAFETY: Unwrap safe as already checked that `data` not empty
114        .getattr("__class__")?
115        .getattr("__name__")?
116        .extract()?;
117
118    match data_type.as_str() {
119        stringify!(OrderBookDelta) => {
120            let deltas = pyobjects_to_book_deltas(data)?;
121            py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
122        }
123        stringify!(OrderBookDepth10) => {
124            let depth_snapshots: Vec<OrderBookDepth10> = data
125                .into_iter()
126                .map(|obj| obj.extract::<OrderBookDepth10>().map_err(Into::into))
127                .collect::<PyResult<Vec<OrderBookDepth10>>>()?;
128            py_book_depth10_to_arrow_record_batch_bytes(py, depth_snapshots)
129        }
130        stringify!(QuoteTick) => {
131            let quotes = pyobjects_to_quotes(data)?;
132            py_quotes_to_arrow_record_batch_bytes(py, quotes)
133        }
134        stringify!(TradeTick) => {
135            let trades = pyobjects_to_trades(data)?;
136            py_trades_to_arrow_record_batch_bytes(py, trades)
137        }
138        stringify!(Bar) => {
139            let bars = pyobjects_to_bars(data)?;
140            py_bars_to_arrow_record_batch_bytes(py, bars)
141        }
142        stringify!(MarkPriceUpdate) => {
143            let updates = pyobjects_to_mark_prices(data)?;
144            py_mark_prices_to_arrow_record_batch_bytes(py, updates)
145        }
146        stringify!(IndexPriceUpdate) => {
147            let index_prices = pyobjects_to_index_prices(data)?;
148            py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
149        }
150        stringify!(InstrumentClose) => {
151            let closes = pyobjects_to_instrument_closes(data)?;
152            py_instrument_closes_to_arrow_record_batch_bytes(py, closes)
153        }
154        _ => Err(to_pyvalue_err(format!(
155            "unsupported data type: {data_type}"
156        ))),
157    }
158}
159
160/// Converts a list of `OrderBookDelta` into Arrow IPC bytes for Python.
161///
162/// # Errors
163///
164/// Returns a `PyErr` if encoding fails.
165#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
166pub fn py_book_deltas_to_arrow_record_batch_bytes(
167    py: Python,
168    data: Vec<OrderBookDelta>,
169) -> PyResult<Py<PyBytes>> {
170    match book_deltas_to_arrow_record_batch_bytes(data) {
171        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
172        Err(e) => Err(to_pyvalue_err(e)),
173    }
174}
175
176/// Converts a list of `OrderBookDepth10` into Arrow IPC bytes for Python.
177///
178/// # Errors
179///
180/// Returns a `PyErr` if encoding fails.
181#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
182pub fn py_book_depth10_to_arrow_record_batch_bytes(
183    py: Python,
184    data: Vec<OrderBookDepth10>,
185) -> PyResult<Py<PyBytes>> {
186    match book_depth10_to_arrow_record_batch_bytes(data) {
187        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
188        Err(e) => Err(to_pyvalue_err(e)),
189    }
190}
191
192/// Converts a list of `QuoteTick` into Arrow IPC bytes for Python.
193///
194/// # Errors
195///
196/// Returns a `PyErr` if encoding fails.
197#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
198pub fn py_quotes_to_arrow_record_batch_bytes(
199    py: Python,
200    data: Vec<QuoteTick>,
201) -> PyResult<Py<PyBytes>> {
202    match quotes_to_arrow_record_batch_bytes(data) {
203        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
204        Err(e) => Err(to_pyvalue_err(e)),
205    }
206}
207
208/// Converts a list of `TradeTick` into Arrow IPC bytes for Python.
209///
210/// # Errors
211///
212/// Returns a `PyErr` if encoding fails.
213#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
214pub fn py_trades_to_arrow_record_batch_bytes(
215    py: Python,
216    data: Vec<TradeTick>,
217) -> PyResult<Py<PyBytes>> {
218    match trades_to_arrow_record_batch_bytes(data) {
219        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
220        Err(e) => Err(to_pyvalue_err(e)),
221    }
222}
223
224/// Converts a list of `Bar` into Arrow IPC bytes for Python.
225///
226/// # Errors
227///
228/// Returns a `PyErr` if encoding fails.
229#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
230pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
231    match bars_to_arrow_record_batch_bytes(data) {
232        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
233        Err(e) => Err(to_pyvalue_err(e)),
234    }
235}
236
237/// Converts a list of `MarkPriceUpdate` into Arrow IPC bytes for Python.
238///
239/// # Errors
240///
241/// Returns a `PyErr` if encoding fails.
242#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
243pub fn py_mark_prices_to_arrow_record_batch_bytes(
244    py: Python,
245    data: Vec<MarkPriceUpdate>,
246) -> PyResult<Py<PyBytes>> {
247    match mark_prices_to_arrow_record_batch_bytes(data) {
248        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
249        Err(e) => Err(to_pyvalue_err(e)),
250    }
251}
252
253/// Converts a list of `IndexPriceUpdate` into Arrow IPC bytes for Python.
254///
255/// # Errors
256///
257/// Returns a `PyErr` if encoding fails.
258#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
259pub fn py_index_prices_to_arrow_record_batch_bytes(
260    py: Python,
261    data: Vec<IndexPriceUpdate>,
262) -> PyResult<Py<PyBytes>> {
263    match index_prices_to_arrow_record_batch_bytes(data) {
264        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
265        Err(e) => Err(to_pyvalue_err(e)),
266    }
267}
268
269/// Converts a list of `InstrumentClose` into Arrow IPC bytes for Python.
270///
271/// # Errors
272///
273/// Returns a `PyErr` if encoding fails.
274#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
275pub fn py_instrument_closes_to_arrow_record_batch_bytes(
276    py: Python,
277    data: Vec<InstrumentClose>,
278) -> PyResult<Py<PyBytes>> {
279    match instrument_closes_to_arrow_record_batch_bytes(data) {
280        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
281        Err(e) => Err(to_pyvalue_err(e)),
282    }
283}