1pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28 collections::HashMap,
29 io::{self, Write},
30};
31
32use arrow::{
33 array::{Array, ArrayRef, FixedSizeBinaryArray},
34 datatypes::{DataType, Schema},
35 error::ArrowError,
36 ipc::writer::StreamWriter,
37 record_batch::RecordBatch,
38};
39use nautilus_model::{
40 data::{
41 Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42 delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43 },
44 types::{
45 PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
46 fixed::{PRECISION_BYTES, correct_price_raw, correct_quantity_raw},
47 price::PriceRaw,
48 quantity::QuantityRaw,
49 },
50};
51#[cfg(feature = "python")]
52use pyo3::prelude::*;
53
54const KEY_BAR_TYPE: &str = "bar_type";
56pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
57const KEY_PRICE_PRECISION: &str = "price_precision";
58const KEY_SIZE_PRECISION: &str = "size_precision";
59
60#[derive(thiserror::Error, Debug)]
61pub enum DataStreamingError {
62 #[error("I/O error: {0}")]
63 IoError(#[from] io::Error),
64 #[error("Arrow error: {0}")]
65 ArrowError(#[from] arrow::error::ArrowError),
66 #[cfg(feature = "python")]
67 #[error("Python error: {0}")]
68 PythonError(#[from] PyErr),
69}
70
71#[derive(thiserror::Error, Debug)]
72pub enum EncodingError {
73 #[error("Empty data")]
74 EmptyData,
75 #[error("Missing metadata key: `{0}`")]
76 MissingMetadata(&'static str),
77 #[error("Missing data column: `{0}` at index {1}")]
78 MissingColumn(&'static str, usize),
79 #[error("Error parsing `{0}`: {1}")]
80 ParseError(&'static str, String),
81 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
82 InvalidColumnType(&'static str, usize, DataType, DataType),
83 #[error(
84 "Precision mode mismatch for `{field}`: catalog data has {actual_bytes} byte values, \
85 but this build expects {expected_bytes} bytes. The catalog was created with a different \
86 precision mode (standard=8 bytes, high=16 bytes). Rebuild the catalog or change your \
87 build's precision mode. See: https://nautilustrader.io/docs/latest/getting_started/installation#precision-mode"
88 )]
89 PrecisionMismatch {
90 field: &'static str,
91 expected_bytes: i32,
92 actual_bytes: i32,
93 },
94 #[error("Arrow error: {0}")]
95 ArrowError(#[from] arrow::error::ArrowError),
96}
97
98#[inline]
99fn get_raw_price(bytes: &[u8]) -> PriceRaw {
100 PriceRaw::from_le_bytes(
101 bytes
102 .try_into()
103 .expect("Price raw bytes must be exactly the size of PriceRaw"),
104 )
105}
106
107#[inline]
108fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
109 QuantityRaw::from_le_bytes(
110 bytes
111 .try_into()
112 .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
113 )
114}
115
116#[inline]
124fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
125 let raw = get_raw_price(bytes);
126
127 if raw == PRICE_UNDEF || raw == PRICE_ERROR {
129 return raw;
130 }
131
132 correct_price_raw(raw, precision)
133}
134
135#[inline]
143fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
144 let raw = get_raw_quantity(bytes);
145
146 if raw == QUANTITY_UNDEF {
148 return raw;
149 }
150
151 correct_quantity_raw(raw, precision)
152}
153
154fn decode_price(
159 bytes: &[u8],
160 precision: u8,
161 field: &'static str,
162 row: usize,
163) -> Result<Price, EncodingError> {
164 let raw = get_corrected_raw_price(bytes, precision);
165 Price::from_raw_checked(raw, precision)
166 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
167}
168
169fn decode_quantity(
174 bytes: &[u8],
175 precision: u8,
176 field: &'static str,
177 row: usize,
178) -> Result<Quantity, EncodingError> {
179 let raw = get_corrected_raw_quantity(bytes, precision);
180 Quantity::from_raw_checked(raw, precision)
181 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
182}
183
184fn decode_price_with_sentinel(
188 bytes: &[u8],
189 precision: u8,
190 field: &'static str,
191 row: usize,
192) -> Result<Price, EncodingError> {
193 let raw = get_raw_price(bytes);
194 let (final_raw, final_precision) = if raw == PRICE_UNDEF {
195 (raw, 0)
196 } else {
197 (get_corrected_raw_price(bytes, precision), precision)
198 };
199 Price::from_raw_checked(final_raw, final_precision)
200 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
201}
202
203fn decode_quantity_with_sentinel(
207 bytes: &[u8],
208 precision: u8,
209 field: &'static str,
210 row: usize,
211) -> Result<Quantity, EncodingError> {
212 let raw = get_raw_quantity(bytes);
213 let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
214 (raw, 0)
215 } else {
216 (get_corrected_raw_quantity(bytes, precision), precision)
217 };
218 Quantity::from_raw_checked(final_raw, final_precision)
219 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
220}
221
222pub trait ArrowSchemaProvider {
224 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
226
227 #[must_use]
229 fn get_schema_map() -> HashMap<String, String> {
230 let schema = Self::get_schema(None);
231 let mut map = HashMap::new();
232 for field in schema.fields() {
233 let name = field.name().clone();
234 let data_type = format!("{:?}", field.data_type());
235 map.insert(name, data_type);
236 }
237 map
238 }
239}
240
241pub trait EncodeToRecordBatch
243where
244 Self: Sized + ArrowSchemaProvider,
245{
246 fn encode_batch(
252 metadata: &HashMap<String, String>,
253 data: &[Self],
254 ) -> Result<RecordBatch, ArrowError>;
255
256 fn metadata(&self) -> HashMap<String, String>;
258
259 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
265 chunk
266 .first()
267 .map(|elem| elem.metadata())
268 .expect("Chunk must have at least one element to encode")
269 }
270}
271
272pub trait DecodeFromRecordBatch
274where
275 Self: Sized + Into<Data> + ArrowSchemaProvider,
276{
277 fn decode_batch(
283 metadata: &HashMap<String, String>,
284 record_batch: RecordBatch,
285 ) -> Result<Vec<Self>, EncodingError>;
286}
287
288pub trait DecodeDataFromRecordBatch
290where
291 Self: Sized + Into<Data> + ArrowSchemaProvider,
292{
293 fn decode_data_batch(
299 metadata: &HashMap<String, String>,
300 record_batch: RecordBatch,
301 ) -> Result<Vec<Data>, EncodingError>;
302}
303
304pub trait WriteStream {
306 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
312}
313
314impl<T: Write> WriteStream for T {
315 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
316 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
317 writer.write(record_batch)?;
318 writer.finish()?;
319 Ok(())
320 }
321}
322
323pub fn extract_column<'a, T: Array + 'static>(
331 cols: &'a [ArrayRef],
332 column_key: &'static str,
333 column_index: usize,
334 expected_type: DataType,
335) -> Result<&'a T, EncodingError> {
336 let column_values = cols
337 .get(column_index)
338 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
339 let downcasted_values =
340 column_values
341 .as_any()
342 .downcast_ref::<T>()
343 .ok_or(EncodingError::InvalidColumnType(
344 column_key,
345 column_index,
346 expected_type,
347 column_values.data_type().clone(),
348 ))?;
349 Ok(downcasted_values)
350}
351
352pub fn validate_precision_bytes(
362 array: &FixedSizeBinaryArray,
363 field: &'static str,
364) -> Result<(), EncodingError> {
365 let actual = array.value_length();
366 if actual != PRECISION_BYTES {
367 return Err(EncodingError::PrecisionMismatch {
368 field,
369 expected_bytes: PRECISION_BYTES,
370 actual_bytes: actual,
371 });
372 }
373 Ok(())
374}
375
376pub fn book_deltas_to_arrow_record_batch_bytes(
384 data: Vec<OrderBookDelta>,
385) -> Result<RecordBatch, EncodingError> {
386 if data.is_empty() {
387 return Err(EncodingError::EmptyData);
388 }
389
390 let metadata = OrderBookDelta::chunk_metadata(&data);
392 OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
393}
394
395pub fn book_depth10_to_arrow_record_batch_bytes(
407 data: Vec<OrderBookDepth10>,
408) -> Result<RecordBatch, EncodingError> {
409 if data.is_empty() {
410 return Err(EncodingError::EmptyData);
411 }
412
413 let first = data.first().unwrap();
416 let metadata = first.metadata();
417 OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
418}
419
420pub fn quotes_to_arrow_record_batch_bytes(
432 data: Vec<QuoteTick>,
433) -> Result<RecordBatch, EncodingError> {
434 if data.is_empty() {
435 return Err(EncodingError::EmptyData);
436 }
437
438 let first = data.first().unwrap();
441 let metadata = first.metadata();
442 QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
443}
444
445pub fn trades_to_arrow_record_batch_bytes(
457 data: Vec<TradeTick>,
458) -> Result<RecordBatch, EncodingError> {
459 if data.is_empty() {
460 return Err(EncodingError::EmptyData);
461 }
462
463 let first = data.first().unwrap();
466 let metadata = first.metadata();
467 TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
468}
469
470pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
482 if data.is_empty() {
483 return Err(EncodingError::EmptyData);
484 }
485
486 let first = data.first().unwrap();
489 let metadata = first.metadata();
490 Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
491}
492
493pub fn mark_prices_to_arrow_record_batch_bytes(
505 data: Vec<MarkPriceUpdate>,
506) -> Result<RecordBatch, EncodingError> {
507 if data.is_empty() {
508 return Err(EncodingError::EmptyData);
509 }
510
511 let first = data.first().unwrap();
514 let metadata = first.metadata();
515 MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
516}
517
518pub fn index_prices_to_arrow_record_batch_bytes(
530 data: Vec<IndexPriceUpdate>,
531) -> Result<RecordBatch, EncodingError> {
532 if data.is_empty() {
533 return Err(EncodingError::EmptyData);
534 }
535
536 let first = data.first().unwrap();
539 let metadata = first.metadata();
540 IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
541}
542
543pub fn instrument_closes_to_arrow_record_batch_bytes(
555 data: Vec<InstrumentClose>,
556) -> Result<RecordBatch, EncodingError> {
557 if data.is_empty() {
558 return Err(EncodingError::EmptyData);
559 }
560
561 let first = data.first().unwrap();
564 let metadata = first.metadata();
565 InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
566}