nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Nautilus types.
17
18pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28    collections::HashMap,
29    io::{self, Write},
30};
31
32use arrow::{
33    array::{Array, ArrayRef},
34    datatypes::{DataType, Schema},
35    error::ArrowError,
36    ipc::writer::StreamWriter,
37    record_batch::RecordBatch,
38};
39use nautilus_model::{
40    data::{
41        Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42        delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43    },
44    types::{price::PriceRaw, quantity::QuantityRaw},
45};
46#[cfg(feature = "python")]
47use pyo3::prelude::*;
48
49// Define metadata key constants constants
50const KEY_BAR_TYPE: &str = "bar_type";
51pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
52const KEY_PRICE_PRECISION: &str = "price_precision";
53const KEY_SIZE_PRECISION: &str = "size_precision";
54
55#[derive(thiserror::Error, Debug)]
56pub enum DataStreamingError {
57    #[error("I/O error: {0}")]
58    IoError(#[from] io::Error),
59    #[error("Arrow error: {0}")]
60    ArrowError(#[from] arrow::error::ArrowError),
61    #[cfg(feature = "python")]
62    #[error("Python error: {0}")]
63    PythonError(#[from] PyErr),
64}
65
66#[derive(thiserror::Error, Debug)]
67pub enum EncodingError {
68    #[error("Empty data")]
69    EmptyData,
70    #[error("Missing metadata key: `{0}`")]
71    MissingMetadata(&'static str),
72    #[error("Missing data column: `{0}` at index {1}")]
73    MissingColumn(&'static str, usize),
74    #[error("Error parsing `{0}`: {1}")]
75    ParseError(&'static str, String),
76    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
77    InvalidColumnType(&'static str, usize, DataType, DataType),
78    #[error("Arrow error: {0}")]
79    ArrowError(#[from] arrow::error::ArrowError),
80}
81
82#[inline]
83fn get_raw_price(bytes: &[u8]) -> PriceRaw {
84    PriceRaw::from_le_bytes(
85        bytes
86            .try_into()
87            .expect("Price raw bytes must be exactly the size of PriceRaw"),
88    )
89}
90
91#[inline]
92fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
93    QuantityRaw::from_le_bytes(
94        bytes
95            .try_into()
96            .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
97    )
98}
99
100/// Provides Apache Arrow schema definitions for data types.
101pub trait ArrowSchemaProvider {
102    /// Returns the Arrow schema for this type with optional metadata.
103    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
104
105    /// Returns a map of field names to their Arrow data types.
106    #[must_use]
107    fn get_schema_map() -> HashMap<String, String> {
108        let schema = Self::get_schema(None);
109        let mut map = HashMap::new();
110        for field in schema.fields() {
111            let name = field.name().to_string();
112            let data_type = format!("{:?}", field.data_type());
113            map.insert(name, data_type);
114        }
115        map
116    }
117}
118
119/// Encodes data types to Apache Arrow RecordBatch format.
120pub trait EncodeToRecordBatch
121where
122    Self: Sized + ArrowSchemaProvider,
123{
124    /// Encodes a batch of values into an Arrow `RecordBatch` using the provided metadata.
125    ///
126    /// # Errors
127    ///
128    /// Returns an `ArrowError` if the encoding fails.
129    fn encode_batch(
130        metadata: &HashMap<String, String>,
131        data: &[Self],
132    ) -> Result<RecordBatch, ArrowError>;
133
134    /// Returns the metadata for this data element.
135    fn metadata(&self) -> HashMap<String, String>;
136
137    /// Returns the metadata for the first element in a chunk.
138    ///
139    /// # Panics
140    ///
141    /// Panics if `chunk` is empty.
142    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
143        chunk
144            .first()
145            .map(|elem| elem.metadata())
146            .expect("Chunk must have atleast one element to encode")
147    }
148}
149
150/// Decodes data types from Apache Arrow RecordBatch format.
151pub trait DecodeFromRecordBatch
152where
153    Self: Sized + Into<Data> + ArrowSchemaProvider,
154{
155    /// Decodes a `RecordBatch` into a vector of values of the implementing type, using the provided metadata.
156    ///
157    /// # Errors
158    ///
159    /// Returns an `EncodingError` if the decoding fails.
160    fn decode_batch(
161        metadata: &HashMap<String, String>,
162        record_batch: RecordBatch,
163    ) -> Result<Vec<Self>, EncodingError>;
164}
165
166/// Decodes raw Data objects from Apache Arrow RecordBatch format.
167pub trait DecodeDataFromRecordBatch
168where
169    Self: Sized + Into<Data> + ArrowSchemaProvider,
170{
171    /// Decodes a `RecordBatch` into raw `Data` values, using the provided metadata.
172    ///
173    /// # Errors
174    ///
175    /// Returns an `EncodingError` if the decoding fails.
176    fn decode_data_batch(
177        metadata: &HashMap<String, String>,
178        record_batch: RecordBatch,
179    ) -> Result<Vec<Data>, EncodingError>;
180}
181
182/// Writes RecordBatch data to output streams.
183pub trait WriteStream {
184    /// Writes a `RecordBatch` to the implementing output stream.
185    ///
186    /// # Errors
187    ///
188    /// Returns a `DataStreamingError` if writing or finishing the stream fails.
189    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
190}
191
192impl<T: Write> WriteStream for T {
193    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
194        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
195        writer.write(record_batch)?;
196        writer.finish()?;
197        Ok(())
198    }
199}
200
201/// Extracts and downcasts the specified `column_key` column from an Arrow array slice.
202///
203/// # Errors
204///
205/// Returns an error if:
206/// - `column_index` is out of range: `EncodingError::MissingColumn`.
207/// - The column type does not match `expected_type`: `EncodingError::InvalidColumnType`.
208pub fn extract_column<'a, T: Array + 'static>(
209    cols: &'a [ArrayRef],
210    column_key: &'static str,
211    column_index: usize,
212    expected_type: DataType,
213) -> Result<&'a T, EncodingError> {
214    let column_values = cols
215        .get(column_index)
216        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
217    let downcasted_values =
218        column_values
219            .as_any()
220            .downcast_ref::<T>()
221            .ok_or(EncodingError::InvalidColumnType(
222                column_key,
223                column_index,
224                expected_type,
225                column_values.data_type().clone(),
226            ))?;
227    Ok(downcasted_values)
228}
229
230/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
231///
232/// # Errors
233///
234/// Returns an error if:
235/// - `data` is empty: `EncodingError::EmptyData`.
236/// - Encoding fails: `EncodingError::ArrowError`.
237pub fn book_deltas_to_arrow_record_batch_bytes(
238    data: Vec<OrderBookDelta>,
239) -> Result<RecordBatch, EncodingError> {
240    if data.is_empty() {
241        return Err(EncodingError::EmptyData);
242    }
243
244    // Extract metadata from chunk
245    let metadata = OrderBookDelta::chunk_metadata(&data);
246    OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
247}
248
249/// Converts a vector of `OrderBookDepth10` into an Arrow `RecordBatch`.
250///
251/// # Errors
252///
253/// Returns an error if:
254/// - `data` is empty: `EncodingError::EmptyData`.
255/// - Encoding fails: `EncodingError::ArrowError`.
256///
257/// # Panics
258///
259/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
260pub fn book_depth10_to_arrow_record_batch_bytes(
261    data: Vec<OrderBookDepth10>,
262) -> Result<RecordBatch, EncodingError> {
263    if data.is_empty() {
264        return Err(EncodingError::EmptyData);
265    }
266
267    // Take first element and extract metadata
268    // SAFETY: Unwrap safe as already checked that `data` not empty
269    let first = data.first().unwrap();
270    let metadata = first.metadata();
271    OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
272}
273
274/// Converts a vector of `QuoteTick` into an Arrow `RecordBatch`.
275///
276/// # Errors
277///
278/// Returns an error if:
279/// - `data` is empty: `EncodingError::EmptyData`.
280/// - Encoding fails: `EncodingError::ArrowError`.
281///
282/// # Panics
283///
284/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
285pub fn quotes_to_arrow_record_batch_bytes(
286    data: Vec<QuoteTick>,
287) -> Result<RecordBatch, EncodingError> {
288    if data.is_empty() {
289        return Err(EncodingError::EmptyData);
290    }
291
292    // Take first element and extract metadata
293    // SAFETY: Unwrap safe as already checked that `data` not empty
294    let first = data.first().unwrap();
295    let metadata = first.metadata();
296    QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
297}
298
299/// Converts a vector of `TradeTick` into an Arrow `RecordBatch`.
300///
301/// # Errors
302///
303/// Returns an error if:
304/// - `data` is empty: `EncodingError::EmptyData`.
305/// - Encoding fails: `EncodingError::ArrowError`.
306///
307/// # Panics
308///
309/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
310pub fn trades_to_arrow_record_batch_bytes(
311    data: Vec<TradeTick>,
312) -> Result<RecordBatch, EncodingError> {
313    if data.is_empty() {
314        return Err(EncodingError::EmptyData);
315    }
316
317    // Take first element and extract metadata
318    // SAFETY: Unwrap safe as already checked that `data` not empty
319    let first = data.first().unwrap();
320    let metadata = first.metadata();
321    TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
322}
323
324/// Converts a vector of `Bar` into an Arrow `RecordBatch`.
325///
326/// # Errors
327///
328/// Returns an error if:
329/// - `data` is empty: `EncodingError::EmptyData`.
330/// - Encoding fails: `EncodingError::ArrowError`.
331///
332/// # Panics
333///
334/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
335pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
336    if data.is_empty() {
337        return Err(EncodingError::EmptyData);
338    }
339
340    // Take first element and extract metadata
341    // SAFETY: Unwrap safe as already checked that `data` not empty
342    let first = data.first().unwrap();
343    let metadata = first.metadata();
344    Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
345}
346
347/// Converts a vector of `MarkPriceUpdate` into an Arrow `RecordBatch`.
348///
349/// # Errors
350///
351/// Returns an error if:
352/// - `data` is empty: `EncodingError::EmptyData`.
353/// - Encoding fails: `EncodingError::ArrowError`.
354///
355/// # Panics
356///
357/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
358pub fn mark_prices_to_arrow_record_batch_bytes(
359    data: Vec<MarkPriceUpdate>,
360) -> Result<RecordBatch, EncodingError> {
361    if data.is_empty() {
362        return Err(EncodingError::EmptyData);
363    }
364
365    // Take first element and extract metadata
366    // SAFETY: Unwrap safe as already checked that `data` not empty
367    let first = data.first().unwrap();
368    let metadata = first.metadata();
369    MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
370}
371
372/// Converts a vector of `IndexPriceUpdate` into an Arrow `RecordBatch`.
373///
374/// # Errors
375///
376/// Returns an error if:
377/// - `data` is empty: `EncodingError::EmptyData`.
378/// - Encoding fails: `EncodingError::ArrowError`.
379///
380/// # Panics
381///
382/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
383pub fn index_prices_to_arrow_record_batch_bytes(
384    data: Vec<IndexPriceUpdate>,
385) -> Result<RecordBatch, EncodingError> {
386    if data.is_empty() {
387        return Err(EncodingError::EmptyData);
388    }
389
390    // Take first element and extract metadata
391    // SAFETY: Unwrap safe as already checked that `data` not empty
392    let first = data.first().unwrap();
393    let metadata = first.metadata();
394    IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
395}
396
397/// Converts a vector of `InstrumentClose` into an Arrow `RecordBatch`.
398///
399/// # Errors
400///
401/// Returns an error if:
402/// - `data` is empty: `EncodingError::EmptyData`.
403/// - Encoding fails: `EncodingError::ArrowError`.
404///
405/// # Panics
406///
407/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
408pub fn instrument_closes_to_arrow_record_batch_bytes(
409    data: Vec<InstrumentClose>,
410) -> Result<RecordBatch, EncodingError> {
411    if data.is_empty() {
412        return Err(EncodingError::EmptyData);
413    }
414
415    // Take first element and extract metadata
416    // SAFETY: Unwrap safe as already checked that `data` not empty
417    let first = data.first().unwrap();
418    let metadata = first.metadata();
419    InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
420}