nautilus_serialization/arrow/
mod.rs1pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28 collections::HashMap,
29 io::{self, Write},
30};
31
32use arrow::{
33 array::{Array, ArrayRef},
34 datatypes::{DataType, Schema},
35 error::ArrowError,
36 ipc::writer::StreamWriter,
37 record_batch::RecordBatch,
38};
39#[allow(unused_imports)]
41use nautilus_model::types::fixed::{
42 correct_raw_i64, correct_raw_i128, correct_raw_u64, correct_raw_u128,
43};
44use nautilus_model::{
45 data::{
46 Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
47 delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
48 },
49 types::{PRICE_ERROR, PRICE_UNDEF, QUANTITY_UNDEF, price::PriceRaw, quantity::QuantityRaw},
50};
51#[cfg(feature = "python")]
52use pyo3::prelude::*;
53
54const KEY_BAR_TYPE: &str = "bar_type";
56pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
57const KEY_PRICE_PRECISION: &str = "price_precision";
58const KEY_SIZE_PRECISION: &str = "size_precision";
59
60#[derive(thiserror::Error, Debug)]
61pub enum DataStreamingError {
62 #[error("I/O error: {0}")]
63 IoError(#[from] io::Error),
64 #[error("Arrow error: {0}")]
65 ArrowError(#[from] arrow::error::ArrowError),
66 #[cfg(feature = "python")]
67 #[error("Python error: {0}")]
68 PythonError(#[from] PyErr),
69}
70
71#[derive(thiserror::Error, Debug)]
72pub enum EncodingError {
73 #[error("Empty data")]
74 EmptyData,
75 #[error("Missing metadata key: `{0}`")]
76 MissingMetadata(&'static str),
77 #[error("Missing data column: `{0}` at index {1}")]
78 MissingColumn(&'static str, usize),
79 #[error("Error parsing `{0}`: {1}")]
80 ParseError(&'static str, String),
81 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
82 InvalidColumnType(&'static str, usize, DataType, DataType),
83 #[error("Arrow error: {0}")]
84 ArrowError(#[from] arrow::error::ArrowError),
85}
86
87#[inline]
88fn get_raw_price(bytes: &[u8]) -> PriceRaw {
89 PriceRaw::from_le_bytes(
90 bytes
91 .try_into()
92 .expect("Price raw bytes must be exactly the size of PriceRaw"),
93 )
94}
95
96#[inline]
97fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
98 QuantityRaw::from_le_bytes(
99 bytes
100 .try_into()
101 .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
102 )
103}
104
105#[inline]
113fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
114 let raw = get_raw_price(bytes);
115
116 if raw == PRICE_UNDEF || raw == PRICE_ERROR {
118 return raw;
119 }
120
121 #[cfg(feature = "high-precision")]
122 {
123 correct_raw_i128(raw, precision)
124 }
125 #[cfg(not(feature = "high-precision"))]
126 {
127 correct_raw_i64(raw, precision)
128 }
129}
130
131#[inline]
139fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
140 let raw = get_raw_quantity(bytes);
141
142 if raw == QUANTITY_UNDEF {
144 return raw;
145 }
146
147 #[cfg(feature = "high-precision")]
148 {
149 correct_raw_u128(raw, precision)
150 }
151 #[cfg(not(feature = "high-precision"))]
152 {
153 correct_raw_u64(raw, precision)
154 }
155}
156
157pub trait ArrowSchemaProvider {
159 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
161
162 #[must_use]
164 fn get_schema_map() -> HashMap<String, String> {
165 let schema = Self::get_schema(None);
166 let mut map = HashMap::new();
167 for field in schema.fields() {
168 let name = field.name().clone();
169 let data_type = format!("{:?}", field.data_type());
170 map.insert(name, data_type);
171 }
172 map
173 }
174}
175
176pub trait EncodeToRecordBatch
178where
179 Self: Sized + ArrowSchemaProvider,
180{
181 fn encode_batch(
187 metadata: &HashMap<String, String>,
188 data: &[Self],
189 ) -> Result<RecordBatch, ArrowError>;
190
191 fn metadata(&self) -> HashMap<String, String>;
193
194 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
200 chunk
201 .first()
202 .map(|elem| elem.metadata())
203 .expect("Chunk must have at least one element to encode")
204 }
205}
206
207pub trait DecodeFromRecordBatch
209where
210 Self: Sized + Into<Data> + ArrowSchemaProvider,
211{
212 fn decode_batch(
218 metadata: &HashMap<String, String>,
219 record_batch: RecordBatch,
220 ) -> Result<Vec<Self>, EncodingError>;
221}
222
223pub trait DecodeDataFromRecordBatch
225where
226 Self: Sized + Into<Data> + ArrowSchemaProvider,
227{
228 fn decode_data_batch(
234 metadata: &HashMap<String, String>,
235 record_batch: RecordBatch,
236 ) -> Result<Vec<Data>, EncodingError>;
237}
238
239pub trait WriteStream {
241 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
247}
248
249impl<T: Write> WriteStream for T {
250 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
251 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
252 writer.write(record_batch)?;
253 writer.finish()?;
254 Ok(())
255 }
256}
257
258pub fn extract_column<'a, T: Array + 'static>(
266 cols: &'a [ArrayRef],
267 column_key: &'static str,
268 column_index: usize,
269 expected_type: DataType,
270) -> Result<&'a T, EncodingError> {
271 let column_values = cols
272 .get(column_index)
273 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
274 let downcasted_values =
275 column_values
276 .as_any()
277 .downcast_ref::<T>()
278 .ok_or(EncodingError::InvalidColumnType(
279 column_key,
280 column_index,
281 expected_type,
282 column_values.data_type().clone(),
283 ))?;
284 Ok(downcasted_values)
285}
286
287pub fn book_deltas_to_arrow_record_batch_bytes(
295 data: Vec<OrderBookDelta>,
296) -> Result<RecordBatch, EncodingError> {
297 if data.is_empty() {
298 return Err(EncodingError::EmptyData);
299 }
300
301 let metadata = OrderBookDelta::chunk_metadata(&data);
303 OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
304}
305
306pub fn book_depth10_to_arrow_record_batch_bytes(
318 data: Vec<OrderBookDepth10>,
319) -> Result<RecordBatch, EncodingError> {
320 if data.is_empty() {
321 return Err(EncodingError::EmptyData);
322 }
323
324 let first = data.first().unwrap();
327 let metadata = first.metadata();
328 OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
329}
330
331pub fn quotes_to_arrow_record_batch_bytes(
343 data: Vec<QuoteTick>,
344) -> Result<RecordBatch, EncodingError> {
345 if data.is_empty() {
346 return Err(EncodingError::EmptyData);
347 }
348
349 let first = data.first().unwrap();
352 let metadata = first.metadata();
353 QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
354}
355
356pub fn trades_to_arrow_record_batch_bytes(
368 data: Vec<TradeTick>,
369) -> Result<RecordBatch, EncodingError> {
370 if data.is_empty() {
371 return Err(EncodingError::EmptyData);
372 }
373
374 let first = data.first().unwrap();
377 let metadata = first.metadata();
378 TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
379}
380
381pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
393 if data.is_empty() {
394 return Err(EncodingError::EmptyData);
395 }
396
397 let first = data.first().unwrap();
400 let metadata = first.metadata();
401 Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
402}
403
404pub fn mark_prices_to_arrow_record_batch_bytes(
416 data: Vec<MarkPriceUpdate>,
417) -> Result<RecordBatch, EncodingError> {
418 if data.is_empty() {
419 return Err(EncodingError::EmptyData);
420 }
421
422 let first = data.first().unwrap();
425 let metadata = first.metadata();
426 MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
427}
428
429pub fn index_prices_to_arrow_record_batch_bytes(
441 data: Vec<IndexPriceUpdate>,
442) -> Result<RecordBatch, EncodingError> {
443 if data.is_empty() {
444 return Err(EncodingError::EmptyData);
445 }
446
447 let first = data.first().unwrap();
450 let metadata = first.metadata();
451 IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
452}
453
454pub fn instrument_closes_to_arrow_record_batch_bytes(
466 data: Vec<InstrumentClose>,
467) -> Result<RecordBatch, EncodingError> {
468 if data.is_empty() {
469 return Err(EncodingError::EmptyData);
470 }
471
472 let first = data.first().unwrap();
475 let metadata = first.metadata();
476 InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
477}