nautilus_serialization/arrow/
mod.rs
1pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28 collections::HashMap,
29 io::{self, Write},
30};
31
32use arrow::{
33 array::{Array, ArrayRef},
34 datatypes::{DataType, Schema},
35 error::ArrowError,
36 ipc::writer::StreamWriter,
37 record_batch::RecordBatch,
38};
39use nautilus_model::{
40 data::{
41 Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42 delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43 },
44 types::{price::PriceRaw, quantity::QuantityRaw},
45};
46use pyo3::prelude::*;
47
48const KEY_BAR_TYPE: &str = "bar_type";
50pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
51const KEY_PRICE_PRECISION: &str = "price_precision";
52const KEY_SIZE_PRECISION: &str = "size_precision";
53
54#[derive(thiserror::Error, Debug)]
55pub enum DataStreamingError {
56 #[error("I/O error: {0}")]
57 IoError(#[from] io::Error),
58 #[error("Arrow error: {0}")]
59 ArrowError(#[from] arrow::error::ArrowError),
60 #[error("Python error: {0}")]
61 PythonError(#[from] PyErr),
62}
63
64#[derive(thiserror::Error, Debug)]
65pub enum EncodingError {
66 #[error("Empty data")]
67 EmptyData,
68 #[error("Missing metadata key: `{0}`")]
69 MissingMetadata(&'static str),
70 #[error("Missing data column: `{0}` at index {1}")]
71 MissingColumn(&'static str, usize),
72 #[error("Error parsing `{0}`: {1}")]
73 ParseError(&'static str, String),
74 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
75 InvalidColumnType(&'static str, usize, DataType, DataType),
76 #[error("Arrow error: {0}")]
77 ArrowError(#[from] arrow::error::ArrowError),
78}
79
80#[inline]
81fn get_raw_price(bytes: &[u8]) -> PriceRaw {
82 PriceRaw::from_le_bytes(bytes.try_into().unwrap())
83}
84
85#[inline]
86fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
87 QuantityRaw::from_le_bytes(bytes.try_into().unwrap())
88}
89
90pub trait ArrowSchemaProvider {
91 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
92
93 #[must_use]
94 fn get_schema_map() -> HashMap<String, String> {
95 let schema = Self::get_schema(None);
96 let mut map = HashMap::new();
97 for field in schema.fields() {
98 let name = field.name().to_string();
99 let data_type = format!("{:?}", field.data_type());
100 map.insert(name, data_type);
101 }
102 map
103 }
104}
105
106pub trait EncodeToRecordBatch
107where
108 Self: Sized + ArrowSchemaProvider,
109{
110 fn encode_batch(
111 metadata: &HashMap<String, String>,
112 data: &[Self],
113 ) -> Result<RecordBatch, ArrowError>;
114
115 fn metadata(&self) -> HashMap<String, String>;
116 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
117 chunk
118 .first()
119 .map(|elem| elem.metadata())
120 .expect("Chunk must have atleast one element to encode")
121 }
122}
123
124pub trait DecodeFromRecordBatch
125where
126 Self: Sized + Into<Data> + ArrowSchemaProvider,
127{
128 fn decode_batch(
129 metadata: &HashMap<String, String>,
130 record_batch: RecordBatch,
131 ) -> Result<Vec<Self>, EncodingError>;
132}
133
134pub trait DecodeDataFromRecordBatch
135where
136 Self: Sized + Into<Data> + ArrowSchemaProvider,
137{
138 fn decode_data_batch(
139 metadata: &HashMap<String, String>,
140 record_batch: RecordBatch,
141 ) -> Result<Vec<Data>, EncodingError>;
142}
143
144pub trait WriteStream {
145 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
146}
147
148impl<T: EncodeToRecordBatch + Write> WriteStream for T {
149 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
150 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
151 writer.write(record_batch)?;
152 writer.finish()?;
153 Ok(())
154 }
155}
156
157pub fn extract_column<'a, T: Array + 'static>(
158 cols: &'a [ArrayRef],
159 column_key: &'static str,
160 column_index: usize,
161 expected_type: DataType,
162) -> Result<&'a T, EncodingError> {
163 let column_values = cols
164 .get(column_index)
165 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
166 let downcasted_values =
167 column_values
168 .as_any()
169 .downcast_ref::<T>()
170 .ok_or(EncodingError::InvalidColumnType(
171 column_key,
172 column_index,
173 expected_type,
174 column_values.data_type().clone(),
175 ))?;
176 Ok(downcasted_values)
177}
178
179pub fn book_deltas_to_arrow_record_batch_bytes(
180 data: Vec<OrderBookDelta>,
181) -> Result<RecordBatch, EncodingError> {
182 if data.is_empty() {
183 return Err(EncodingError::EmptyData);
184 }
185
186 let metadata = OrderBookDelta::chunk_metadata(&data);
188 OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
189}
190
191pub fn book_depth10_to_arrow_record_batch_bytes(
192 data: Vec<OrderBookDepth10>,
193) -> Result<RecordBatch, EncodingError> {
194 if data.is_empty() {
195 return Err(EncodingError::EmptyData);
196 }
197
198 let first = data.first().unwrap();
201 let metadata = first.metadata();
202 OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
203}
204
205pub fn quotes_to_arrow_record_batch_bytes(
206 data: Vec<QuoteTick>,
207) -> Result<RecordBatch, EncodingError> {
208 if data.is_empty() {
209 return Err(EncodingError::EmptyData);
210 }
211
212 let first = data.first().unwrap();
215 let metadata = first.metadata();
216 QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
217}
218
219pub fn trades_to_arrow_record_batch_bytes(
220 data: Vec<TradeTick>,
221) -> Result<RecordBatch, EncodingError> {
222 if data.is_empty() {
223 return Err(EncodingError::EmptyData);
224 }
225
226 let first = data.first().unwrap();
229 let metadata = first.metadata();
230 TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
231}
232
233pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
234 if data.is_empty() {
235 return Err(EncodingError::EmptyData);
236 }
237
238 let first = data.first().unwrap();
241 let metadata = first.metadata();
242 Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
243}
244
245pub fn mark_prices_to_arrow_record_batch_bytes(
246 data: Vec<MarkPriceUpdate>,
247) -> Result<RecordBatch, EncodingError> {
248 if data.is_empty() {
249 return Err(EncodingError::EmptyData);
250 }
251
252 let first = data.first().unwrap();
255 let metadata = first.metadata();
256 MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
257}
258
259pub fn index_prices_to_arrow_record_batch_bytes(
260 data: Vec<IndexPriceUpdate>,
261) -> Result<RecordBatch, EncodingError> {
262 if data.is_empty() {
263 return Err(EncodingError::EmptyData);
264 }
265
266 let first = data.first().unwrap();
269 let metadata = first.metadata();
270 IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
271}
272
273pub fn instrument_closes_to_arrow_record_batch_bytes(
274 data: Vec<InstrumentClose>,
275) -> Result<RecordBatch, EncodingError> {
276 if data.is_empty() {
277 return Err(EncodingError::EmptyData);
278 }
279
280 let first = data.first().unwrap();
283 let metadata = first.metadata();
284 InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
285}