nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Nautilus types.
17
18pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28    collections::HashMap,
29    io::{self, Write},
30};
31
32use arrow::{
33    array::{Array, ArrayRef},
34    datatypes::{DataType, Schema},
35    error::ArrowError,
36    ipc::writer::StreamWriter,
37    record_batch::RecordBatch,
38};
39// Only the i64/u64 or i128/u128 variants are used depending on high-precision feature
40#[allow(unused_imports)]
41use nautilus_model::types::fixed::{
42    correct_raw_i64, correct_raw_i128, correct_raw_u64, correct_raw_u128,
43};
44use nautilus_model::{
45    data::{
46        Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
47        delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
48    },
49    types::{PRICE_ERROR, PRICE_UNDEF, QUANTITY_UNDEF, price::PriceRaw, quantity::QuantityRaw},
50};
51#[cfg(feature = "python")]
52use pyo3::prelude::*;
53
54// Define metadata key constants constants
55const KEY_BAR_TYPE: &str = "bar_type";
56pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
57const KEY_PRICE_PRECISION: &str = "price_precision";
58const KEY_SIZE_PRECISION: &str = "size_precision";
59
60#[derive(thiserror::Error, Debug)]
61pub enum DataStreamingError {
62    #[error("I/O error: {0}")]
63    IoError(#[from] io::Error),
64    #[error("Arrow error: {0}")]
65    ArrowError(#[from] arrow::error::ArrowError),
66    #[cfg(feature = "python")]
67    #[error("Python error: {0}")]
68    PythonError(#[from] PyErr),
69}
70
71#[derive(thiserror::Error, Debug)]
72pub enum EncodingError {
73    #[error("Empty data")]
74    EmptyData,
75    #[error("Missing metadata key: `{0}`")]
76    MissingMetadata(&'static str),
77    #[error("Missing data column: `{0}` at index {1}")]
78    MissingColumn(&'static str, usize),
79    #[error("Error parsing `{0}`: {1}")]
80    ParseError(&'static str, String),
81    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
82    InvalidColumnType(&'static str, usize, DataType, DataType),
83    #[error("Arrow error: {0}")]
84    ArrowError(#[from] arrow::error::ArrowError),
85}
86
87#[inline]
88fn get_raw_price(bytes: &[u8]) -> PriceRaw {
89    PriceRaw::from_le_bytes(
90        bytes
91            .try_into()
92            .expect("Price raw bytes must be exactly the size of PriceRaw"),
93    )
94}
95
96#[inline]
97fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
98    QuantityRaw::from_le_bytes(
99        bytes
100            .try_into()
101            .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
102    )
103}
104
105/// Gets raw price bytes and corrects for floating-point precision errors in stored data.
106///
107/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
108/// introduces floating-point errors. This corrects the raw value to the nearest valid
109/// multiple of the scale factor for the given precision.
110///
111/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
112#[inline]
113fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
114    let raw = get_raw_price(bytes);
115
116    // Preserve sentinel values unchanged
117    if raw == PRICE_UNDEF || raw == PRICE_ERROR {
118        return raw;
119    }
120
121    #[cfg(feature = "high-precision")]
122    {
123        correct_raw_i128(raw, precision)
124    }
125    #[cfg(not(feature = "high-precision"))]
126    {
127        correct_raw_i64(raw, precision)
128    }
129}
130
131/// Gets raw quantity bytes and corrects for floating-point precision errors in stored data.
132///
133/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
134/// introduces floating-point errors. This corrects the raw value to the nearest valid
135/// multiple of the scale factor for the given precision.
136///
137/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
138#[inline]
139fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
140    let raw = get_raw_quantity(bytes);
141
142    // Preserve sentinel values unchanged
143    if raw == QUANTITY_UNDEF {
144        return raw;
145    }
146
147    #[cfg(feature = "high-precision")]
148    {
149        correct_raw_u128(raw, precision)
150    }
151    #[cfg(not(feature = "high-precision"))]
152    {
153        correct_raw_u64(raw, precision)
154    }
155}
156
157/// Provides Apache Arrow schema definitions for data types.
158pub trait ArrowSchemaProvider {
159    /// Returns the Arrow schema for this type with optional metadata.
160    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
161
162    /// Returns a map of field names to their Arrow data types.
163    #[must_use]
164    fn get_schema_map() -> HashMap<String, String> {
165        let schema = Self::get_schema(None);
166        let mut map = HashMap::new();
167        for field in schema.fields() {
168            let name = field.name().clone();
169            let data_type = format!("{:?}", field.data_type());
170            map.insert(name, data_type);
171        }
172        map
173    }
174}
175
176/// Encodes data types to Apache Arrow RecordBatch format.
177pub trait EncodeToRecordBatch
178where
179    Self: Sized + ArrowSchemaProvider,
180{
181    /// Encodes a batch of values into an Arrow `RecordBatch` using the provided metadata.
182    ///
183    /// # Errors
184    ///
185    /// Returns an `ArrowError` if the encoding fails.
186    fn encode_batch(
187        metadata: &HashMap<String, String>,
188        data: &[Self],
189    ) -> Result<RecordBatch, ArrowError>;
190
191    /// Returns the metadata for this data element.
192    fn metadata(&self) -> HashMap<String, String>;
193
194    /// Returns the metadata for the first element in a chunk.
195    ///
196    /// # Panics
197    ///
198    /// Panics if `chunk` is empty.
199    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
200        chunk
201            .first()
202            .map(|elem| elem.metadata())
203            .expect("Chunk must have at least one element to encode")
204    }
205}
206
207/// Decodes data types from Apache Arrow RecordBatch format.
208pub trait DecodeFromRecordBatch
209where
210    Self: Sized + Into<Data> + ArrowSchemaProvider,
211{
212    /// Decodes a `RecordBatch` into a vector of values of the implementing type, using the provided metadata.
213    ///
214    /// # Errors
215    ///
216    /// Returns an `EncodingError` if the decoding fails.
217    fn decode_batch(
218        metadata: &HashMap<String, String>,
219        record_batch: RecordBatch,
220    ) -> Result<Vec<Self>, EncodingError>;
221}
222
223/// Decodes raw Data objects from Apache Arrow RecordBatch format.
224pub trait DecodeDataFromRecordBatch
225where
226    Self: Sized + Into<Data> + ArrowSchemaProvider,
227{
228    /// Decodes a `RecordBatch` into raw `Data` values, using the provided metadata.
229    ///
230    /// # Errors
231    ///
232    /// Returns an `EncodingError` if the decoding fails.
233    fn decode_data_batch(
234        metadata: &HashMap<String, String>,
235        record_batch: RecordBatch,
236    ) -> Result<Vec<Data>, EncodingError>;
237}
238
239/// Writes RecordBatch data to output streams.
240pub trait WriteStream {
241    /// Writes a `RecordBatch` to the implementing output stream.
242    ///
243    /// # Errors
244    ///
245    /// Returns a `DataStreamingError` if writing or finishing the stream fails.
246    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
247}
248
249impl<T: Write> WriteStream for T {
250    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
251        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
252        writer.write(record_batch)?;
253        writer.finish()?;
254        Ok(())
255    }
256}
257
258/// Extracts and downcasts the specified `column_key` column from an Arrow array slice.
259///
260/// # Errors
261///
262/// Returns an error if:
263/// - `column_index` is out of range: `EncodingError::MissingColumn`.
264/// - The column type does not match `expected_type`: `EncodingError::InvalidColumnType`.
265pub fn extract_column<'a, T: Array + 'static>(
266    cols: &'a [ArrayRef],
267    column_key: &'static str,
268    column_index: usize,
269    expected_type: DataType,
270) -> Result<&'a T, EncodingError> {
271    let column_values = cols
272        .get(column_index)
273        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
274    let downcasted_values =
275        column_values
276            .as_any()
277            .downcast_ref::<T>()
278            .ok_or(EncodingError::InvalidColumnType(
279                column_key,
280                column_index,
281                expected_type,
282                column_values.data_type().clone(),
283            ))?;
284    Ok(downcasted_values)
285}
286
287/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
288///
289/// # Errors
290///
291/// Returns an error if:
292/// - `data` is empty: `EncodingError::EmptyData`.
293/// - Encoding fails: `EncodingError::ArrowError`.
294pub fn book_deltas_to_arrow_record_batch_bytes(
295    data: Vec<OrderBookDelta>,
296) -> Result<RecordBatch, EncodingError> {
297    if data.is_empty() {
298        return Err(EncodingError::EmptyData);
299    }
300
301    // Extract metadata from chunk
302    let metadata = OrderBookDelta::chunk_metadata(&data);
303    OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
304}
305
306/// Converts a vector of `OrderBookDepth10` into an Arrow `RecordBatch`.
307///
308/// # Errors
309///
310/// Returns an error if:
311/// - `data` is empty: `EncodingError::EmptyData`.
312/// - Encoding fails: `EncodingError::ArrowError`.
313///
314/// # Panics
315///
316/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
317pub fn book_depth10_to_arrow_record_batch_bytes(
318    data: Vec<OrderBookDepth10>,
319) -> Result<RecordBatch, EncodingError> {
320    if data.is_empty() {
321        return Err(EncodingError::EmptyData);
322    }
323
324    // Take first element and extract metadata
325    // SAFETY: Unwrap safe as already checked that `data` not empty
326    let first = data.first().unwrap();
327    let metadata = first.metadata();
328    OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
329}
330
331/// Converts a vector of `QuoteTick` into an Arrow `RecordBatch`.
332///
333/// # Errors
334///
335/// Returns an error if:
336/// - `data` is empty: `EncodingError::EmptyData`.
337/// - Encoding fails: `EncodingError::ArrowError`.
338///
339/// # Panics
340///
341/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
342pub fn quotes_to_arrow_record_batch_bytes(
343    data: Vec<QuoteTick>,
344) -> Result<RecordBatch, EncodingError> {
345    if data.is_empty() {
346        return Err(EncodingError::EmptyData);
347    }
348
349    // Take first element and extract metadata
350    // SAFETY: Unwrap safe as already checked that `data` not empty
351    let first = data.first().unwrap();
352    let metadata = first.metadata();
353    QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
354}
355
356/// Converts a vector of `TradeTick` into an Arrow `RecordBatch`.
357///
358/// # Errors
359///
360/// Returns an error if:
361/// - `data` is empty: `EncodingError::EmptyData`.
362/// - Encoding fails: `EncodingError::ArrowError`.
363///
364/// # Panics
365///
366/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
367pub fn trades_to_arrow_record_batch_bytes(
368    data: Vec<TradeTick>,
369) -> Result<RecordBatch, EncodingError> {
370    if data.is_empty() {
371        return Err(EncodingError::EmptyData);
372    }
373
374    // Take first element and extract metadata
375    // SAFETY: Unwrap safe as already checked that `data` not empty
376    let first = data.first().unwrap();
377    let metadata = first.metadata();
378    TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
379}
380
381/// Converts a vector of `Bar` into an Arrow `RecordBatch`.
382///
383/// # Errors
384///
385/// Returns an error if:
386/// - `data` is empty: `EncodingError::EmptyData`.
387/// - Encoding fails: `EncodingError::ArrowError`.
388///
389/// # Panics
390///
391/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
392pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
393    if data.is_empty() {
394        return Err(EncodingError::EmptyData);
395    }
396
397    // Take first element and extract metadata
398    // SAFETY: Unwrap safe as already checked that `data` not empty
399    let first = data.first().unwrap();
400    let metadata = first.metadata();
401    Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
402}
403
404/// Converts a vector of `MarkPriceUpdate` into an Arrow `RecordBatch`.
405///
406/// # Errors
407///
408/// Returns an error if:
409/// - `data` is empty: `EncodingError::EmptyData`.
410/// - Encoding fails: `EncodingError::ArrowError`.
411///
412/// # Panics
413///
414/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
415pub fn mark_prices_to_arrow_record_batch_bytes(
416    data: Vec<MarkPriceUpdate>,
417) -> Result<RecordBatch, EncodingError> {
418    if data.is_empty() {
419        return Err(EncodingError::EmptyData);
420    }
421
422    // Take first element and extract metadata
423    // SAFETY: Unwrap safe as already checked that `data` not empty
424    let first = data.first().unwrap();
425    let metadata = first.metadata();
426    MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
427}
428
429/// Converts a vector of `IndexPriceUpdate` into an Arrow `RecordBatch`.
430///
431/// # Errors
432///
433/// Returns an error if:
434/// - `data` is empty: `EncodingError::EmptyData`.
435/// - Encoding fails: `EncodingError::ArrowError`.
436///
437/// # Panics
438///
439/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
440pub fn index_prices_to_arrow_record_batch_bytes(
441    data: Vec<IndexPriceUpdate>,
442) -> Result<RecordBatch, EncodingError> {
443    if data.is_empty() {
444        return Err(EncodingError::EmptyData);
445    }
446
447    // Take first element and extract metadata
448    // SAFETY: Unwrap safe as already checked that `data` not empty
449    let first = data.first().unwrap();
450    let metadata = first.metadata();
451    IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
452}
453
454/// Converts a vector of `InstrumentClose` into an Arrow `RecordBatch`.
455///
456/// # Errors
457///
458/// Returns an error if:
459/// - `data` is empty: `EncodingError::EmptyData`.
460/// - Encoding fails: `EncodingError::ArrowError`.
461///
462/// # Panics
463///
464/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
465pub fn instrument_closes_to_arrow_record_batch_bytes(
466    data: Vec<InstrumentClose>,
467) -> Result<RecordBatch, EncodingError> {
468    if data.is_empty() {
469        return Err(EncodingError::EmptyData);
470    }
471
472    // Take first element and extract metadata
473    // SAFETY: Unwrap safe as already checked that `data` not empty
474    let first = data.first().unwrap();
475    let metadata = first.metadata();
476    InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
477}