nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Nautilus types.
17
18pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28    collections::HashMap,
29    io::{self, Write},
30};
31
32use arrow::{
33    array::{Array, ArrayRef},
34    datatypes::{DataType, Schema},
35    error::ArrowError,
36    ipc::writer::StreamWriter,
37    record_batch::RecordBatch,
38};
39use nautilus_model::{
40    data::{
41        Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42        delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43    },
44    types::{
45        PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
46        fixed::{correct_price_raw, correct_quantity_raw},
47        price::PriceRaw,
48        quantity::QuantityRaw,
49    },
50};
51#[cfg(feature = "python")]
52use pyo3::prelude::*;
53
54// Define metadata key constants constants
55const KEY_BAR_TYPE: &str = "bar_type";
56pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
57const KEY_PRICE_PRECISION: &str = "price_precision";
58const KEY_SIZE_PRECISION: &str = "size_precision";
59
60#[derive(thiserror::Error, Debug)]
61pub enum DataStreamingError {
62    #[error("I/O error: {0}")]
63    IoError(#[from] io::Error),
64    #[error("Arrow error: {0}")]
65    ArrowError(#[from] arrow::error::ArrowError),
66    #[cfg(feature = "python")]
67    #[error("Python error: {0}")]
68    PythonError(#[from] PyErr),
69}
70
71#[derive(thiserror::Error, Debug)]
72pub enum EncodingError {
73    #[error("Empty data")]
74    EmptyData,
75    #[error("Missing metadata key: `{0}`")]
76    MissingMetadata(&'static str),
77    #[error("Missing data column: `{0}` at index {1}")]
78    MissingColumn(&'static str, usize),
79    #[error("Error parsing `{0}`: {1}")]
80    ParseError(&'static str, String),
81    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
82    InvalidColumnType(&'static str, usize, DataType, DataType),
83    #[error("Arrow error: {0}")]
84    ArrowError(#[from] arrow::error::ArrowError),
85}
86
87#[inline]
88fn get_raw_price(bytes: &[u8]) -> PriceRaw {
89    PriceRaw::from_le_bytes(
90        bytes
91            .try_into()
92            .expect("Price raw bytes must be exactly the size of PriceRaw"),
93    )
94}
95
96#[inline]
97fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
98    QuantityRaw::from_le_bytes(
99        bytes
100            .try_into()
101            .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
102    )
103}
104
105/// Gets raw price bytes and corrects for floating-point precision errors in stored data.
106///
107/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
108/// introduces floating-point errors. This corrects the raw value to the nearest valid
109/// multiple of the scale factor for the given precision.
110///
111/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
112#[inline]
113fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
114    let raw = get_raw_price(bytes);
115
116    // Preserve sentinel values unchanged
117    if raw == PRICE_UNDEF || raw == PRICE_ERROR {
118        return raw;
119    }
120
121    correct_price_raw(raw, precision)
122}
123
124/// Gets raw quantity bytes and corrects for floating-point precision errors in stored data.
125///
126/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
127/// introduces floating-point errors. This corrects the raw value to the nearest valid
128/// multiple of the scale factor for the given precision.
129///
130/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
131#[inline]
132fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
133    let raw = get_raw_quantity(bytes);
134
135    // Preserve sentinel values unchanged
136    if raw == QUANTITY_UNDEF {
137        return raw;
138    }
139
140    correct_quantity_raw(raw, precision)
141}
142
143/// Decodes a [`Price`] from raw bytes with bounds validation.
144///
145/// Uses corrected raw values to handle floating-point precision errors in stored data.
146/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
147fn decode_price(
148    bytes: &[u8],
149    precision: u8,
150    field: &'static str,
151    row: usize,
152) -> Result<Price, EncodingError> {
153    let raw = get_corrected_raw_price(bytes, precision);
154    Price::from_raw_checked(raw, precision)
155        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
156}
157
158/// Decodes a [`Quantity`] from raw bytes with bounds validation.
159///
160/// Uses corrected raw values to handle floating-point precision errors in stored data.
161/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
162fn decode_quantity(
163    bytes: &[u8],
164    precision: u8,
165    field: &'static str,
166    row: usize,
167) -> Result<Quantity, EncodingError> {
168    let raw = get_corrected_raw_quantity(bytes, precision);
169    Quantity::from_raw_checked(raw, precision)
170        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
171}
172
173/// Decodes a [`Price`] from raw bytes, using precision 0 for sentinel values.
174///
175/// For order book data where sentinel values indicate empty levels.
176fn decode_price_with_sentinel(
177    bytes: &[u8],
178    precision: u8,
179    field: &'static str,
180    row: usize,
181) -> Result<Price, EncodingError> {
182    let raw = get_raw_price(bytes);
183    let (final_raw, final_precision) = if raw == PRICE_UNDEF {
184        (raw, 0)
185    } else {
186        (get_corrected_raw_price(bytes, precision), precision)
187    };
188    Price::from_raw_checked(final_raw, final_precision)
189        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
190}
191
192/// Decodes a [`Quantity`] from raw bytes, using precision 0 for sentinel values.
193///
194/// For order book data where sentinel values indicate empty levels.
195fn decode_quantity_with_sentinel(
196    bytes: &[u8],
197    precision: u8,
198    field: &'static str,
199    row: usize,
200) -> Result<Quantity, EncodingError> {
201    let raw = get_raw_quantity(bytes);
202    let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
203        (raw, 0)
204    } else {
205        (get_corrected_raw_quantity(bytes, precision), precision)
206    };
207    Quantity::from_raw_checked(final_raw, final_precision)
208        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
209}
210
211/// Provides Apache Arrow schema definitions for data types.
212pub trait ArrowSchemaProvider {
213    /// Returns the Arrow schema for this type with optional metadata.
214    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
215
216    /// Returns a map of field names to their Arrow data types.
217    #[must_use]
218    fn get_schema_map() -> HashMap<String, String> {
219        let schema = Self::get_schema(None);
220        let mut map = HashMap::new();
221        for field in schema.fields() {
222            let name = field.name().clone();
223            let data_type = format!("{:?}", field.data_type());
224            map.insert(name, data_type);
225        }
226        map
227    }
228}
229
230/// Encodes data types to Apache Arrow RecordBatch format.
231pub trait EncodeToRecordBatch
232where
233    Self: Sized + ArrowSchemaProvider,
234{
235    /// Encodes a batch of values into an Arrow `RecordBatch` using the provided metadata.
236    ///
237    /// # Errors
238    ///
239    /// Returns an `ArrowError` if the encoding fails.
240    fn encode_batch(
241        metadata: &HashMap<String, String>,
242        data: &[Self],
243    ) -> Result<RecordBatch, ArrowError>;
244
245    /// Returns the metadata for this data element.
246    fn metadata(&self) -> HashMap<String, String>;
247
248    /// Returns the metadata for the first element in a chunk.
249    ///
250    /// # Panics
251    ///
252    /// Panics if `chunk` is empty.
253    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
254        chunk
255            .first()
256            .map(|elem| elem.metadata())
257            .expect("Chunk must have at least one element to encode")
258    }
259}
260
261/// Decodes data types from Apache Arrow RecordBatch format.
262pub trait DecodeFromRecordBatch
263where
264    Self: Sized + Into<Data> + ArrowSchemaProvider,
265{
266    /// Decodes a `RecordBatch` into a vector of values of the implementing type, using the provided metadata.
267    ///
268    /// # Errors
269    ///
270    /// Returns an `EncodingError` if the decoding fails.
271    fn decode_batch(
272        metadata: &HashMap<String, String>,
273        record_batch: RecordBatch,
274    ) -> Result<Vec<Self>, EncodingError>;
275}
276
277/// Decodes raw Data objects from Apache Arrow RecordBatch format.
278pub trait DecodeDataFromRecordBatch
279where
280    Self: Sized + Into<Data> + ArrowSchemaProvider,
281{
282    /// Decodes a `RecordBatch` into raw `Data` values, using the provided metadata.
283    ///
284    /// # Errors
285    ///
286    /// Returns an `EncodingError` if the decoding fails.
287    fn decode_data_batch(
288        metadata: &HashMap<String, String>,
289        record_batch: RecordBatch,
290    ) -> Result<Vec<Data>, EncodingError>;
291}
292
293/// Writes RecordBatch data to output streams.
294pub trait WriteStream {
295    /// Writes a `RecordBatch` to the implementing output stream.
296    ///
297    /// # Errors
298    ///
299    /// Returns a `DataStreamingError` if writing or finishing the stream fails.
300    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
301}
302
303impl<T: Write> WriteStream for T {
304    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
305        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
306        writer.write(record_batch)?;
307        writer.finish()?;
308        Ok(())
309    }
310}
311
312/// Extracts and downcasts the specified `column_key` column from an Arrow array slice.
313///
314/// # Errors
315///
316/// Returns an error if:
317/// - `column_index` is out of range: `EncodingError::MissingColumn`.
318/// - The column type does not match `expected_type`: `EncodingError::InvalidColumnType`.
319pub fn extract_column<'a, T: Array + 'static>(
320    cols: &'a [ArrayRef],
321    column_key: &'static str,
322    column_index: usize,
323    expected_type: DataType,
324) -> Result<&'a T, EncodingError> {
325    let column_values = cols
326        .get(column_index)
327        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
328    let downcasted_values =
329        column_values
330            .as_any()
331            .downcast_ref::<T>()
332            .ok_or(EncodingError::InvalidColumnType(
333                column_key,
334                column_index,
335                expected_type,
336                column_values.data_type().clone(),
337            ))?;
338    Ok(downcasted_values)
339}
340
341/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
342///
343/// # Errors
344///
345/// Returns an error if:
346/// - `data` is empty: `EncodingError::EmptyData`.
347/// - Encoding fails: `EncodingError::ArrowError`.
348pub fn book_deltas_to_arrow_record_batch_bytes(
349    data: Vec<OrderBookDelta>,
350) -> Result<RecordBatch, EncodingError> {
351    if data.is_empty() {
352        return Err(EncodingError::EmptyData);
353    }
354
355    // Extract metadata from chunk
356    let metadata = OrderBookDelta::chunk_metadata(&data);
357    OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
358}
359
360/// Converts a vector of `OrderBookDepth10` into an Arrow `RecordBatch`.
361///
362/// # Errors
363///
364/// Returns an error if:
365/// - `data` is empty: `EncodingError::EmptyData`.
366/// - Encoding fails: `EncodingError::ArrowError`.
367///
368/// # Panics
369///
370/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
371pub fn book_depth10_to_arrow_record_batch_bytes(
372    data: Vec<OrderBookDepth10>,
373) -> Result<RecordBatch, EncodingError> {
374    if data.is_empty() {
375        return Err(EncodingError::EmptyData);
376    }
377
378    // Take first element and extract metadata
379    // SAFETY: Unwrap safe as already checked that `data` not empty
380    let first = data.first().unwrap();
381    let metadata = first.metadata();
382    OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
383}
384
385/// Converts a vector of `QuoteTick` into an Arrow `RecordBatch`.
386///
387/// # Errors
388///
389/// Returns an error if:
390/// - `data` is empty: `EncodingError::EmptyData`.
391/// - Encoding fails: `EncodingError::ArrowError`.
392///
393/// # Panics
394///
395/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
396pub fn quotes_to_arrow_record_batch_bytes(
397    data: Vec<QuoteTick>,
398) -> Result<RecordBatch, EncodingError> {
399    if data.is_empty() {
400        return Err(EncodingError::EmptyData);
401    }
402
403    // Take first element and extract metadata
404    // SAFETY: Unwrap safe as already checked that `data` not empty
405    let first = data.first().unwrap();
406    let metadata = first.metadata();
407    QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
408}
409
410/// Converts a vector of `TradeTick` into an Arrow `RecordBatch`.
411///
412/// # Errors
413///
414/// Returns an error if:
415/// - `data` is empty: `EncodingError::EmptyData`.
416/// - Encoding fails: `EncodingError::ArrowError`.
417///
418/// # Panics
419///
420/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
421pub fn trades_to_arrow_record_batch_bytes(
422    data: Vec<TradeTick>,
423) -> Result<RecordBatch, EncodingError> {
424    if data.is_empty() {
425        return Err(EncodingError::EmptyData);
426    }
427
428    // Take first element and extract metadata
429    // SAFETY: Unwrap safe as already checked that `data` not empty
430    let first = data.first().unwrap();
431    let metadata = first.metadata();
432    TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
433}
434
435/// Converts a vector of `Bar` into an Arrow `RecordBatch`.
436///
437/// # Errors
438///
439/// Returns an error if:
440/// - `data` is empty: `EncodingError::EmptyData`.
441/// - Encoding fails: `EncodingError::ArrowError`.
442///
443/// # Panics
444///
445/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
446pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
447    if data.is_empty() {
448        return Err(EncodingError::EmptyData);
449    }
450
451    // Take first element and extract metadata
452    // SAFETY: Unwrap safe as already checked that `data` not empty
453    let first = data.first().unwrap();
454    let metadata = first.metadata();
455    Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
456}
457
458/// Converts a vector of `MarkPriceUpdate` into an Arrow `RecordBatch`.
459///
460/// # Errors
461///
462/// Returns an error if:
463/// - `data` is empty: `EncodingError::EmptyData`.
464/// - Encoding fails: `EncodingError::ArrowError`.
465///
466/// # Panics
467///
468/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
469pub fn mark_prices_to_arrow_record_batch_bytes(
470    data: Vec<MarkPriceUpdate>,
471) -> Result<RecordBatch, EncodingError> {
472    if data.is_empty() {
473        return Err(EncodingError::EmptyData);
474    }
475
476    // Take first element and extract metadata
477    // SAFETY: Unwrap safe as already checked that `data` not empty
478    let first = data.first().unwrap();
479    let metadata = first.metadata();
480    MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
481}
482
483/// Converts a vector of `IndexPriceUpdate` into an Arrow `RecordBatch`.
484///
485/// # Errors
486///
487/// Returns an error if:
488/// - `data` is empty: `EncodingError::EmptyData`.
489/// - Encoding fails: `EncodingError::ArrowError`.
490///
491/// # Panics
492///
493/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
494pub fn index_prices_to_arrow_record_batch_bytes(
495    data: Vec<IndexPriceUpdate>,
496) -> Result<RecordBatch, EncodingError> {
497    if data.is_empty() {
498        return Err(EncodingError::EmptyData);
499    }
500
501    // Take first element and extract metadata
502    // SAFETY: Unwrap safe as already checked that `data` not empty
503    let first = data.first().unwrap();
504    let metadata = first.metadata();
505    IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
506}
507
508/// Converts a vector of `InstrumentClose` into an Arrow `RecordBatch`.
509///
510/// # Errors
511///
512/// Returns an error if:
513/// - `data` is empty: `EncodingError::EmptyData`.
514/// - Encoding fails: `EncodingError::ArrowError`.
515///
516/// # Panics
517///
518/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
519pub fn instrument_closes_to_arrow_record_batch_bytes(
520    data: Vec<InstrumentClose>,
521) -> Result<RecordBatch, EncodingError> {
522    if data.is_empty() {
523        return Err(EncodingError::EmptyData);
524    }
525
526    // Take first element and extract metadata
527    // SAFETY: Unwrap safe as already checked that `data` not empty
528    let first = data.first().unwrap();
529    let metadata = first.metadata();
530    InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
531}