Skip to main content

nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Nautilus types.
17
18pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28    collections::HashMap,
29    io::{self, Write},
30};
31
32use arrow::{
33    array::{Array, ArrayRef, FixedSizeBinaryArray},
34    datatypes::{DataType, Schema},
35    error::ArrowError,
36    ipc::writer::StreamWriter,
37    record_batch::RecordBatch,
38};
39use nautilus_model::{
40    data::{
41        Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42        delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43    },
44    types::{
45        PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
46        fixed::{PRECISION_BYTES, correct_price_raw, correct_quantity_raw},
47        price::PriceRaw,
48        quantity::QuantityRaw,
49    },
50};
51#[cfg(feature = "python")]
52use pyo3::prelude::*;
53
54// Define metadata key constants constants
55const KEY_BAR_TYPE: &str = "bar_type";
56pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
57const KEY_PRICE_PRECISION: &str = "price_precision";
58const KEY_SIZE_PRECISION: &str = "size_precision";
59
60#[derive(thiserror::Error, Debug)]
61pub enum DataStreamingError {
62    #[error("I/O error: {0}")]
63    IoError(#[from] io::Error),
64    #[error("Arrow error: {0}")]
65    ArrowError(#[from] arrow::error::ArrowError),
66    #[cfg(feature = "python")]
67    #[error("Python error: {0}")]
68    PythonError(#[from] PyErr),
69}
70
71#[derive(thiserror::Error, Debug)]
72pub enum EncodingError {
73    #[error("Empty data")]
74    EmptyData,
75    #[error("Missing metadata key: `{0}`")]
76    MissingMetadata(&'static str),
77    #[error("Missing data column: `{0}` at index {1}")]
78    MissingColumn(&'static str, usize),
79    #[error("Error parsing `{0}`: {1}")]
80    ParseError(&'static str, String),
81    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
82    InvalidColumnType(&'static str, usize, DataType, DataType),
83    #[error(
84        "Precision mode mismatch for `{field}`: catalog data has {actual_bytes} byte values, \
85         but this build expects {expected_bytes} bytes. The catalog was created with a different \
86         precision mode (standard=8 bytes, high=16 bytes). Rebuild the catalog or change your \
87         build's precision mode. See: https://nautilustrader.io/docs/latest/getting_started/installation#precision-mode"
88    )]
89    PrecisionMismatch {
90        field: &'static str,
91        expected_bytes: i32,
92        actual_bytes: i32,
93    },
94    #[error("Arrow error: {0}")]
95    ArrowError(#[from] arrow::error::ArrowError),
96}
97
98#[inline]
99fn get_raw_price(bytes: &[u8]) -> PriceRaw {
100    PriceRaw::from_le_bytes(
101        bytes
102            .try_into()
103            .expect("Price raw bytes must be exactly the size of PriceRaw"),
104    )
105}
106
107#[inline]
108fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
109    QuantityRaw::from_le_bytes(
110        bytes
111            .try_into()
112            .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
113    )
114}
115
116/// Gets raw price bytes and corrects for floating-point precision errors in stored data.
117///
118/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
119/// introduces floating-point errors. This corrects the raw value to the nearest valid
120/// multiple of the scale factor for the given precision.
121///
122/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
123#[inline]
124fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
125    let raw = get_raw_price(bytes);
126
127    // Preserve sentinel values unchanged
128    if raw == PRICE_UNDEF || raw == PRICE_ERROR {
129        return raw;
130    }
131
132    correct_price_raw(raw, precision)
133}
134
135/// Gets raw quantity bytes and corrects for floating-point precision errors in stored data.
136///
137/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
138/// introduces floating-point errors. This corrects the raw value to the nearest valid
139/// multiple of the scale factor for the given precision.
140///
141/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
142#[inline]
143fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
144    let raw = get_raw_quantity(bytes);
145
146    // Preserve sentinel values unchanged
147    if raw == QUANTITY_UNDEF {
148        return raw;
149    }
150
151    correct_quantity_raw(raw, precision)
152}
153
154/// Decodes a [`Price`] from raw bytes with bounds validation.
155///
156/// Uses corrected raw values to handle floating-point precision errors in stored data.
157/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
158fn decode_price(
159    bytes: &[u8],
160    precision: u8,
161    field: &'static str,
162    row: usize,
163) -> Result<Price, EncodingError> {
164    let raw = get_corrected_raw_price(bytes, precision);
165    Price::from_raw_checked(raw, precision)
166        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
167}
168
169/// Decodes a [`Quantity`] from raw bytes with bounds validation.
170///
171/// Uses corrected raw values to handle floating-point precision errors in stored data.
172/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
173fn decode_quantity(
174    bytes: &[u8],
175    precision: u8,
176    field: &'static str,
177    row: usize,
178) -> Result<Quantity, EncodingError> {
179    let raw = get_corrected_raw_quantity(bytes, precision);
180    Quantity::from_raw_checked(raw, precision)
181        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
182}
183
184/// Decodes a [`Price`] from raw bytes, using precision 0 for sentinel values.
185///
186/// For order book data where sentinel values indicate empty levels.
187fn decode_price_with_sentinel(
188    bytes: &[u8],
189    precision: u8,
190    field: &'static str,
191    row: usize,
192) -> Result<Price, EncodingError> {
193    let raw = get_raw_price(bytes);
194    let (final_raw, final_precision) = if raw == PRICE_UNDEF {
195        (raw, 0)
196    } else {
197        (get_corrected_raw_price(bytes, precision), precision)
198    };
199    Price::from_raw_checked(final_raw, final_precision)
200        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
201}
202
203/// Decodes a [`Quantity`] from raw bytes, using precision 0 for sentinel values.
204///
205/// For order book data where sentinel values indicate empty levels.
206fn decode_quantity_with_sentinel(
207    bytes: &[u8],
208    precision: u8,
209    field: &'static str,
210    row: usize,
211) -> Result<Quantity, EncodingError> {
212    let raw = get_raw_quantity(bytes);
213    let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
214        (raw, 0)
215    } else {
216        (get_corrected_raw_quantity(bytes, precision), precision)
217    };
218    Quantity::from_raw_checked(final_raw, final_precision)
219        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
220}
221
222/// Provides Apache Arrow schema definitions for data types.
223pub trait ArrowSchemaProvider {
224    /// Returns the Arrow schema for this type with optional metadata.
225    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
226
227    /// Returns a map of field names to their Arrow data types.
228    #[must_use]
229    fn get_schema_map() -> HashMap<String, String> {
230        let schema = Self::get_schema(None);
231        let mut map = HashMap::new();
232        for field in schema.fields() {
233            let name = field.name().clone();
234            let data_type = format!("{:?}", field.data_type());
235            map.insert(name, data_type);
236        }
237        map
238    }
239}
240
241/// Encodes data types to Apache Arrow RecordBatch format.
242pub trait EncodeToRecordBatch
243where
244    Self: Sized + ArrowSchemaProvider,
245{
246    /// Encodes a batch of values into an Arrow `RecordBatch` using the provided metadata.
247    ///
248    /// # Errors
249    ///
250    /// Returns an `ArrowError` if the encoding fails.
251    fn encode_batch(
252        metadata: &HashMap<String, String>,
253        data: &[Self],
254    ) -> Result<RecordBatch, ArrowError>;
255
256    /// Returns the metadata for this data element.
257    fn metadata(&self) -> HashMap<String, String>;
258
259    /// Returns the metadata for the first element in a chunk.
260    ///
261    /// # Panics
262    ///
263    /// Panics if `chunk` is empty.
264    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
265        chunk
266            .first()
267            .map(|elem| elem.metadata())
268            .expect("Chunk must have at least one element to encode")
269    }
270}
271
272/// Decodes data types from Apache Arrow RecordBatch format.
273pub trait DecodeFromRecordBatch
274where
275    Self: Sized + Into<Data> + ArrowSchemaProvider,
276{
277    /// Decodes a `RecordBatch` into a vector of values of the implementing type, using the provided metadata.
278    ///
279    /// # Errors
280    ///
281    /// Returns an `EncodingError` if the decoding fails.
282    fn decode_batch(
283        metadata: &HashMap<String, String>,
284        record_batch: RecordBatch,
285    ) -> Result<Vec<Self>, EncodingError>;
286}
287
288/// Decodes raw Data objects from Apache Arrow RecordBatch format.
289pub trait DecodeDataFromRecordBatch
290where
291    Self: Sized + Into<Data> + ArrowSchemaProvider,
292{
293    /// Decodes a `RecordBatch` into raw `Data` values, using the provided metadata.
294    ///
295    /// # Errors
296    ///
297    /// Returns an `EncodingError` if the decoding fails.
298    fn decode_data_batch(
299        metadata: &HashMap<String, String>,
300        record_batch: RecordBatch,
301    ) -> Result<Vec<Data>, EncodingError>;
302}
303
304/// Writes RecordBatch data to output streams.
305pub trait WriteStream {
306    /// Writes a `RecordBatch` to the implementing output stream.
307    ///
308    /// # Errors
309    ///
310    /// Returns a `DataStreamingError` if writing or finishing the stream fails.
311    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
312}
313
314impl<T: Write> WriteStream for T {
315    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
316        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
317        writer.write(record_batch)?;
318        writer.finish()?;
319        Ok(())
320    }
321}
322
323/// Extracts and downcasts the specified `column_key` column from an Arrow array slice.
324///
325/// # Errors
326///
327/// Returns an error if:
328/// - `column_index` is out of range: `EncodingError::MissingColumn`.
329/// - The column type does not match `expected_type`: `EncodingError::InvalidColumnType`.
330pub fn extract_column<'a, T: Array + 'static>(
331    cols: &'a [ArrayRef],
332    column_key: &'static str,
333    column_index: usize,
334    expected_type: DataType,
335) -> Result<&'a T, EncodingError> {
336    let column_values = cols
337        .get(column_index)
338        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
339    let downcasted_values =
340        column_values
341            .as_any()
342            .downcast_ref::<T>()
343            .ok_or(EncodingError::InvalidColumnType(
344                column_key,
345                column_index,
346                expected_type,
347                column_values.data_type().clone(),
348            ))?;
349    Ok(downcasted_values)
350}
351
352/// Validates that a [`FixedSizeBinaryArray`] has the expected precision byte width.
353///
354/// This detects precision mode mismatches that occur when catalog data was encoded
355/// with a different precision mode (64-bit standard vs 128-bit high-precision).
356///
357/// # Errors
358///
359/// Returns [`EncodingError::PrecisionMismatch`] if the actual byte width doesn't
360/// match [`PRECISION_BYTES`].
361pub fn validate_precision_bytes(
362    array: &FixedSizeBinaryArray,
363    field: &'static str,
364) -> Result<(), EncodingError> {
365    let actual = array.value_length();
366    if actual != PRECISION_BYTES {
367        return Err(EncodingError::PrecisionMismatch {
368            field,
369            expected_bytes: PRECISION_BYTES,
370            actual_bytes: actual,
371        });
372    }
373    Ok(())
374}
375
376/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
377///
378/// # Errors
379///
380/// Returns an error if:
381/// - `data` is empty: `EncodingError::EmptyData`.
382/// - Encoding fails: `EncodingError::ArrowError`.
383pub fn book_deltas_to_arrow_record_batch_bytes(
384    data: Vec<OrderBookDelta>,
385) -> Result<RecordBatch, EncodingError> {
386    if data.is_empty() {
387        return Err(EncodingError::EmptyData);
388    }
389
390    // Extract metadata from chunk
391    let metadata = OrderBookDelta::chunk_metadata(&data);
392    OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
393}
394
395/// Converts a vector of `OrderBookDepth10` into an Arrow `RecordBatch`.
396///
397/// # Errors
398///
399/// Returns an error if:
400/// - `data` is empty: `EncodingError::EmptyData`.
401/// - Encoding fails: `EncodingError::ArrowError`.
402///
403/// # Panics
404///
405/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
406pub fn book_depth10_to_arrow_record_batch_bytes(
407    data: Vec<OrderBookDepth10>,
408) -> Result<RecordBatch, EncodingError> {
409    if data.is_empty() {
410        return Err(EncodingError::EmptyData);
411    }
412
413    // Take first element and extract metadata
414    // SAFETY: Unwrap safe as already checked that `data` not empty
415    let first = data.first().unwrap();
416    let metadata = first.metadata();
417    OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
418}
419
420/// Converts a vector of `QuoteTick` into an Arrow `RecordBatch`.
421///
422/// # Errors
423///
424/// Returns an error if:
425/// - `data` is empty: `EncodingError::EmptyData`.
426/// - Encoding fails: `EncodingError::ArrowError`.
427///
428/// # Panics
429///
430/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
431pub fn quotes_to_arrow_record_batch_bytes(
432    data: Vec<QuoteTick>,
433) -> Result<RecordBatch, EncodingError> {
434    if data.is_empty() {
435        return Err(EncodingError::EmptyData);
436    }
437
438    // Take first element and extract metadata
439    // SAFETY: Unwrap safe as already checked that `data` not empty
440    let first = data.first().unwrap();
441    let metadata = first.metadata();
442    QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
443}
444
445/// Converts a vector of `TradeTick` into an Arrow `RecordBatch`.
446///
447/// # Errors
448///
449/// Returns an error if:
450/// - `data` is empty: `EncodingError::EmptyData`.
451/// - Encoding fails: `EncodingError::ArrowError`.
452///
453/// # Panics
454///
455/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
456pub fn trades_to_arrow_record_batch_bytes(
457    data: Vec<TradeTick>,
458) -> Result<RecordBatch, EncodingError> {
459    if data.is_empty() {
460        return Err(EncodingError::EmptyData);
461    }
462
463    // Take first element and extract metadata
464    // SAFETY: Unwrap safe as already checked that `data` not empty
465    let first = data.first().unwrap();
466    let metadata = first.metadata();
467    TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
468}
469
470/// Converts a vector of `Bar` into an Arrow `RecordBatch`.
471///
472/// # Errors
473///
474/// Returns an error if:
475/// - `data` is empty: `EncodingError::EmptyData`.
476/// - Encoding fails: `EncodingError::ArrowError`.
477///
478/// # Panics
479///
480/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
481pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
482    if data.is_empty() {
483        return Err(EncodingError::EmptyData);
484    }
485
486    // Take first element and extract metadata
487    // SAFETY: Unwrap safe as already checked that `data` not empty
488    let first = data.first().unwrap();
489    let metadata = first.metadata();
490    Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
491}
492
493/// Converts a vector of `MarkPriceUpdate` into an Arrow `RecordBatch`.
494///
495/// # Errors
496///
497/// Returns an error if:
498/// - `data` is empty: `EncodingError::EmptyData`.
499/// - Encoding fails: `EncodingError::ArrowError`.
500///
501/// # Panics
502///
503/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
504pub fn mark_prices_to_arrow_record_batch_bytes(
505    data: Vec<MarkPriceUpdate>,
506) -> Result<RecordBatch, EncodingError> {
507    if data.is_empty() {
508        return Err(EncodingError::EmptyData);
509    }
510
511    // Take first element and extract metadata
512    // SAFETY: Unwrap safe as already checked that `data` not empty
513    let first = data.first().unwrap();
514    let metadata = first.metadata();
515    MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
516}
517
518/// Converts a vector of `IndexPriceUpdate` into an Arrow `RecordBatch`.
519///
520/// # Errors
521///
522/// Returns an error if:
523/// - `data` is empty: `EncodingError::EmptyData`.
524/// - Encoding fails: `EncodingError::ArrowError`.
525///
526/// # Panics
527///
528/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
529pub fn index_prices_to_arrow_record_batch_bytes(
530    data: Vec<IndexPriceUpdate>,
531) -> Result<RecordBatch, EncodingError> {
532    if data.is_empty() {
533        return Err(EncodingError::EmptyData);
534    }
535
536    // Take first element and extract metadata
537    // SAFETY: Unwrap safe as already checked that `data` not empty
538    let first = data.first().unwrap();
539    let metadata = first.metadata();
540    IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
541}
542
543/// Converts a vector of `InstrumentClose` into an Arrow `RecordBatch`.
544///
545/// # Errors
546///
547/// Returns an error if:
548/// - `data` is empty: `EncodingError::EmptyData`.
549/// - Encoding fails: `EncodingError::ArrowError`.
550///
551/// # Panics
552///
553/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
554pub fn instrument_closes_to_arrow_record_batch_bytes(
555    data: Vec<InstrumentClose>,
556) -> Result<RecordBatch, EncodingError> {
557    if data.is_empty() {
558        return Err(EncodingError::EmptyData);
559    }
560
561    // Take first element and extract metadata
562    // SAFETY: Unwrap safe as already checked that `data` not empty
563    let first = data.first().unwrap();
564    let metadata = first.metadata();
565    InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
566}