nautilus_serialization/arrow/
mod.rs1pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28 collections::HashMap,
29 io::{self, Write},
30};
31
32use arrow::{
33 array::{Array, ArrayRef},
34 datatypes::{DataType, Schema},
35 error::ArrowError,
36 ipc::writer::StreamWriter,
37 record_batch::RecordBatch,
38};
39use nautilus_model::{
40 data::{
41 Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42 delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43 },
44 types::{price::PriceRaw, quantity::QuantityRaw},
45};
46#[cfg(feature = "python")]
47use pyo3::prelude::*;
48
49const KEY_BAR_TYPE: &str = "bar_type";
51pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
52const KEY_PRICE_PRECISION: &str = "price_precision";
53const KEY_SIZE_PRECISION: &str = "size_precision";
54
55#[derive(thiserror::Error, Debug)]
56pub enum DataStreamingError {
57 #[error("I/O error: {0}")]
58 IoError(#[from] io::Error),
59 #[error("Arrow error: {0}")]
60 ArrowError(#[from] arrow::error::ArrowError),
61 #[cfg(feature = "python")]
62 #[error("Python error: {0}")]
63 PythonError(#[from] PyErr),
64}
65
66#[derive(thiserror::Error, Debug)]
67pub enum EncodingError {
68 #[error("Empty data")]
69 EmptyData,
70 #[error("Missing metadata key: `{0}`")]
71 MissingMetadata(&'static str),
72 #[error("Missing data column: `{0}` at index {1}")]
73 MissingColumn(&'static str, usize),
74 #[error("Error parsing `{0}`: {1}")]
75 ParseError(&'static str, String),
76 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
77 InvalidColumnType(&'static str, usize, DataType, DataType),
78 #[error("Arrow error: {0}")]
79 ArrowError(#[from] arrow::error::ArrowError),
80}
81
82#[inline]
83fn get_raw_price(bytes: &[u8]) -> PriceRaw {
84 PriceRaw::from_le_bytes(
85 bytes
86 .try_into()
87 .expect("Price raw bytes must be exactly the size of PriceRaw"),
88 )
89}
90
91#[inline]
92fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
93 QuantityRaw::from_le_bytes(
94 bytes
95 .try_into()
96 .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
97 )
98}
99
100pub trait ArrowSchemaProvider {
102 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
104
105 #[must_use]
107 fn get_schema_map() -> HashMap<String, String> {
108 let schema = Self::get_schema(None);
109 let mut map = HashMap::new();
110 for field in schema.fields() {
111 let name = field.name().to_string();
112 let data_type = format!("{:?}", field.data_type());
113 map.insert(name, data_type);
114 }
115 map
116 }
117}
118
119pub trait EncodeToRecordBatch
121where
122 Self: Sized + ArrowSchemaProvider,
123{
124 fn encode_batch(
130 metadata: &HashMap<String, String>,
131 data: &[Self],
132 ) -> Result<RecordBatch, ArrowError>;
133
134 fn metadata(&self) -> HashMap<String, String>;
136
137 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
143 chunk
144 .first()
145 .map(|elem| elem.metadata())
146 .expect("Chunk must have atleast one element to encode")
147 }
148}
149
150pub trait DecodeFromRecordBatch
152where
153 Self: Sized + Into<Data> + ArrowSchemaProvider,
154{
155 fn decode_batch(
161 metadata: &HashMap<String, String>,
162 record_batch: RecordBatch,
163 ) -> Result<Vec<Self>, EncodingError>;
164}
165
166pub trait DecodeDataFromRecordBatch
168where
169 Self: Sized + Into<Data> + ArrowSchemaProvider,
170{
171 fn decode_data_batch(
177 metadata: &HashMap<String, String>,
178 record_batch: RecordBatch,
179 ) -> Result<Vec<Data>, EncodingError>;
180}
181
182pub trait WriteStream {
184 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
190}
191
192impl<T: Write> WriteStream for T {
193 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
194 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
195 writer.write(record_batch)?;
196 writer.finish()?;
197 Ok(())
198 }
199}
200
201pub fn extract_column<'a, T: Array + 'static>(
209 cols: &'a [ArrayRef],
210 column_key: &'static str,
211 column_index: usize,
212 expected_type: DataType,
213) -> Result<&'a T, EncodingError> {
214 let column_values = cols
215 .get(column_index)
216 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
217 let downcasted_values =
218 column_values
219 .as_any()
220 .downcast_ref::<T>()
221 .ok_or(EncodingError::InvalidColumnType(
222 column_key,
223 column_index,
224 expected_type,
225 column_values.data_type().clone(),
226 ))?;
227 Ok(downcasted_values)
228}
229
230pub fn book_deltas_to_arrow_record_batch_bytes(
238 data: Vec<OrderBookDelta>,
239) -> Result<RecordBatch, EncodingError> {
240 if data.is_empty() {
241 return Err(EncodingError::EmptyData);
242 }
243
244 let metadata = OrderBookDelta::chunk_metadata(&data);
246 OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
247}
248
249pub fn book_depth10_to_arrow_record_batch_bytes(
261 data: Vec<OrderBookDepth10>,
262) -> Result<RecordBatch, EncodingError> {
263 if data.is_empty() {
264 return Err(EncodingError::EmptyData);
265 }
266
267 let first = data.first().unwrap();
270 let metadata = first.metadata();
271 OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
272}
273
274pub fn quotes_to_arrow_record_batch_bytes(
286 data: Vec<QuoteTick>,
287) -> Result<RecordBatch, EncodingError> {
288 if data.is_empty() {
289 return Err(EncodingError::EmptyData);
290 }
291
292 let first = data.first().unwrap();
295 let metadata = first.metadata();
296 QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
297}
298
299pub fn trades_to_arrow_record_batch_bytes(
311 data: Vec<TradeTick>,
312) -> Result<RecordBatch, EncodingError> {
313 if data.is_empty() {
314 return Err(EncodingError::EmptyData);
315 }
316
317 let first = data.first().unwrap();
320 let metadata = first.metadata();
321 TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
322}
323
324pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
336 if data.is_empty() {
337 return Err(EncodingError::EmptyData);
338 }
339
340 let first = data.first().unwrap();
343 let metadata = first.metadata();
344 Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
345}
346
347pub fn mark_prices_to_arrow_record_batch_bytes(
359 data: Vec<MarkPriceUpdate>,
360) -> Result<RecordBatch, EncodingError> {
361 if data.is_empty() {
362 return Err(EncodingError::EmptyData);
363 }
364
365 let first = data.first().unwrap();
368 let metadata = first.metadata();
369 MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
370}
371
372pub fn index_prices_to_arrow_record_batch_bytes(
384 data: Vec<IndexPriceUpdate>,
385) -> Result<RecordBatch, EncodingError> {
386 if data.is_empty() {
387 return Err(EncodingError::EmptyData);
388 }
389
390 let first = data.first().unwrap();
393 let metadata = first.metadata();
394 IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
395}
396
397pub fn instrument_closes_to_arrow_record_batch_bytes(
409 data: Vec<InstrumentClose>,
410) -> Result<RecordBatch, EncodingError> {
411 if data.is_empty() {
412 return Err(EncodingError::EmptyData);
413 }
414
415 let first = data.first().unwrap();
418 let metadata = first.metadata();
419 InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
420}