nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Nautilus types.
17
18pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28    collections::HashMap,
29    io::{self, Write},
30};
31
32use arrow::{
33    array::{Array, ArrayRef},
34    datatypes::{DataType, Schema},
35    error::ArrowError,
36    ipc::writer::StreamWriter,
37    record_batch::RecordBatch,
38};
39use nautilus_model::{
40    data::{
41        Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42        delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43    },
44    types::{price::PriceRaw, quantity::QuantityRaw},
45};
46use pyo3::prelude::*;
47
48// Define metadata key constants constants
49const KEY_BAR_TYPE: &str = "bar_type";
50pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
51const KEY_PRICE_PRECISION: &str = "price_precision";
52const KEY_SIZE_PRECISION: &str = "size_precision";
53
54#[derive(thiserror::Error, Debug)]
55pub enum DataStreamingError {
56    #[error("I/O error: {0}")]
57    IoError(#[from] io::Error),
58    #[error("Arrow error: {0}")]
59    ArrowError(#[from] arrow::error::ArrowError),
60    #[error("Python error: {0}")]
61    PythonError(#[from] PyErr),
62}
63
64#[derive(thiserror::Error, Debug)]
65pub enum EncodingError {
66    #[error("Empty data")]
67    EmptyData,
68    #[error("Missing metadata key: `{0}`")]
69    MissingMetadata(&'static str),
70    #[error("Missing data column: `{0}` at index {1}")]
71    MissingColumn(&'static str, usize),
72    #[error("Error parsing `{0}`: {1}")]
73    ParseError(&'static str, String),
74    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
75    InvalidColumnType(&'static str, usize, DataType, DataType),
76    #[error("Arrow error: {0}")]
77    ArrowError(#[from] arrow::error::ArrowError),
78}
79
80#[inline]
81fn get_raw_price(bytes: &[u8]) -> PriceRaw {
82    PriceRaw::from_le_bytes(bytes.try_into().unwrap())
83}
84
85#[inline]
86fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
87    QuantityRaw::from_le_bytes(bytes.try_into().unwrap())
88}
89
90pub trait ArrowSchemaProvider {
91    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
92
93    #[must_use]
94    fn get_schema_map() -> HashMap<String, String> {
95        let schema = Self::get_schema(None);
96        let mut map = HashMap::new();
97        for field in schema.fields() {
98            let name = field.name().to_string();
99            let data_type = format!("{:?}", field.data_type());
100            map.insert(name, data_type);
101        }
102        map
103    }
104}
105
106pub trait EncodeToRecordBatch
107where
108    Self: Sized + ArrowSchemaProvider,
109{
110    fn encode_batch(
111        metadata: &HashMap<String, String>,
112        data: &[Self],
113    ) -> Result<RecordBatch, ArrowError>;
114
115    fn metadata(&self) -> HashMap<String, String>;
116    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
117        chunk
118            .first()
119            .map(|elem| elem.metadata())
120            .expect("Chunk must have atleast one element to encode")
121    }
122}
123
124pub trait DecodeFromRecordBatch
125where
126    Self: Sized + Into<Data> + ArrowSchemaProvider,
127{
128    fn decode_batch(
129        metadata: &HashMap<String, String>,
130        record_batch: RecordBatch,
131    ) -> Result<Vec<Self>, EncodingError>;
132}
133
134pub trait DecodeDataFromRecordBatch
135where
136    Self: Sized + Into<Data> + ArrowSchemaProvider,
137{
138    fn decode_data_batch(
139        metadata: &HashMap<String, String>,
140        record_batch: RecordBatch,
141    ) -> Result<Vec<Data>, EncodingError>;
142}
143
144pub trait WriteStream {
145    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
146}
147
148impl<T: EncodeToRecordBatch + Write> WriteStream for T {
149    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
150        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
151        writer.write(record_batch)?;
152        writer.finish()?;
153        Ok(())
154    }
155}
156
157pub fn extract_column<'a, T: Array + 'static>(
158    cols: &'a [ArrayRef],
159    column_key: &'static str,
160    column_index: usize,
161    expected_type: DataType,
162) -> Result<&'a T, EncodingError> {
163    let column_values = cols
164        .get(column_index)
165        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
166    let downcasted_values =
167        column_values
168            .as_any()
169            .downcast_ref::<T>()
170            .ok_or(EncodingError::InvalidColumnType(
171                column_key,
172                column_index,
173                expected_type,
174                column_values.data_type().clone(),
175            ))?;
176    Ok(downcasted_values)
177}
178
179pub fn book_deltas_to_arrow_record_batch_bytes(
180    data: Vec<OrderBookDelta>,
181) -> Result<RecordBatch, EncodingError> {
182    if data.is_empty() {
183        return Err(EncodingError::EmptyData);
184    }
185
186    // Extract metadata from chunk
187    let metadata = OrderBookDelta::chunk_metadata(&data);
188    OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
189}
190
191pub fn book_depth10_to_arrow_record_batch_bytes(
192    data: Vec<OrderBookDepth10>,
193) -> Result<RecordBatch, EncodingError> {
194    if data.is_empty() {
195        return Err(EncodingError::EmptyData);
196    }
197
198    // Take first element and extract metadata
199    // SAFETY: Unwrap safe as already checked that `data` not empty
200    let first = data.first().unwrap();
201    let metadata = first.metadata();
202    OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
203}
204
205pub fn quotes_to_arrow_record_batch_bytes(
206    data: Vec<QuoteTick>,
207) -> Result<RecordBatch, EncodingError> {
208    if data.is_empty() {
209        return Err(EncodingError::EmptyData);
210    }
211
212    // Take first element and extract metadata
213    // SAFETY: Unwrap safe as already checked that `data` not empty
214    let first = data.first().unwrap();
215    let metadata = first.metadata();
216    QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
217}
218
219pub fn trades_to_arrow_record_batch_bytes(
220    data: Vec<TradeTick>,
221) -> Result<RecordBatch, EncodingError> {
222    if data.is_empty() {
223        return Err(EncodingError::EmptyData);
224    }
225
226    // Take first element and extract metadata
227    // SAFETY: Unwrap safe as already checked that `data` not empty
228    let first = data.first().unwrap();
229    let metadata = first.metadata();
230    TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
231}
232
233pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
234    if data.is_empty() {
235        return Err(EncodingError::EmptyData);
236    }
237
238    // Take first element and extract metadata
239    // SAFETY: Unwrap safe as already checked that `data` not empty
240    let first = data.first().unwrap();
241    let metadata = first.metadata();
242    Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
243}
244
245pub fn mark_prices_to_arrow_record_batch_bytes(
246    data: Vec<MarkPriceUpdate>,
247) -> Result<RecordBatch, EncodingError> {
248    if data.is_empty() {
249        return Err(EncodingError::EmptyData);
250    }
251
252    // Take first element and extract metadata
253    // SAFETY: Unwrap safe as already checked that `data` not empty
254    let first = data.first().unwrap();
255    let metadata = first.metadata();
256    MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
257}
258
259pub fn index_prices_to_arrow_record_batch_bytes(
260    data: Vec<IndexPriceUpdate>,
261) -> Result<RecordBatch, EncodingError> {
262    if data.is_empty() {
263        return Err(EncodingError::EmptyData);
264    }
265
266    // Take first element and extract metadata
267    // SAFETY: Unwrap safe as already checked that `data` not empty
268    let first = data.first().unwrap();
269    let metadata = first.metadata();
270    IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
271}
272
273pub fn instrument_closes_to_arrow_record_batch_bytes(
274    data: Vec<InstrumentClose>,
275) -> Result<RecordBatch, EncodingError> {
276    if data.is_empty() {
277        return Err(EncodingError::EmptyData);
278    }
279
280    // Take first element and extract metadata
281    // SAFETY: Unwrap safe as already checked that `data` not empty
282    let first = data.first().unwrap();
283    let metadata = first.metadata();
284    InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
285}