1use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19 array::{
20 Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder, UInt8Array, UInt32Array, UInt64Array,
21 },
22 datatypes::{DataType, Field, Schema},
23 error::ArrowError,
24 record_batch::RecordBatch,
25};
26use nautilus_model::{
27 data::{
28 depth::{DEPTH10_LEN, OrderBookDepth10},
29 order::BookOrder,
30 },
31 enums::OrderSide,
32 identifiers::InstrumentId,
33 types::fixed::PRECISION_BYTES,
34};
35
36use super::{
37 DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
38 KEY_SIZE_PRECISION, decode_price, decode_quantity, extract_column,
39};
40use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
41
42fn get_field_data() -> Vec<(&'static str, DataType)> {
43 vec![
44 ("bid_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
45 ("ask_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
46 ("bid_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
47 ("ask_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
48 ("bid_count", DataType::UInt32),
49 ("ask_count", DataType::UInt32),
50 ]
51}
52
53impl ArrowSchemaProvider for OrderBookDepth10 {
54 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
55 let mut fields = Vec::new();
56 let field_data = get_field_data();
57
58 for (name, data_type) in field_data {
61 for i in 0..DEPTH10_LEN {
62 fields.push(Field::new(format!("{name}_{i}"), data_type.clone(), false));
63 }
64 }
65
66 fields.push(Field::new("flags", DataType::UInt8, false));
67 fields.push(Field::new("sequence", DataType::UInt64, false));
68 fields.push(Field::new("ts_event", DataType::UInt64, false));
69 fields.push(Field::new("ts_init", DataType::UInt64, false));
70
71 match metadata {
72 Some(metadata) => Schema::new_with_metadata(fields, metadata),
73 None => Schema::new(fields),
74 }
75 }
76}
77
78fn parse_metadata(
79 metadata: &HashMap<String, String>,
80) -> Result<(InstrumentId, u8, u8), EncodingError> {
81 let instrument_id_str = metadata
82 .get(KEY_INSTRUMENT_ID)
83 .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
84 let instrument_id = InstrumentId::from_str(instrument_id_str)
85 .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
86
87 let price_precision = metadata
88 .get(KEY_PRICE_PRECISION)
89 .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
90 .parse::<u8>()
91 .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
92
93 let size_precision = metadata
94 .get(KEY_SIZE_PRECISION)
95 .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
96 .parse::<u8>()
97 .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
98
99 Ok((instrument_id, price_precision, size_precision))
100}
101
102impl EncodeToRecordBatch for OrderBookDepth10 {
103 fn encode_batch(
104 metadata: &HashMap<String, String>,
105 data: &[Self],
106 ) -> Result<RecordBatch, ArrowError> {
107 let mut bid_price_builders = Vec::with_capacity(DEPTH10_LEN);
108 let mut ask_price_builders = Vec::with_capacity(DEPTH10_LEN);
109 let mut bid_size_builders = Vec::with_capacity(DEPTH10_LEN);
110 let mut ask_size_builders = Vec::with_capacity(DEPTH10_LEN);
111 let mut bid_count_builders = Vec::with_capacity(DEPTH10_LEN);
112 let mut ask_count_builders = Vec::with_capacity(DEPTH10_LEN);
113
114 for _ in 0..DEPTH10_LEN {
115 bid_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
116 data.len(),
117 PRECISION_BYTES,
118 ));
119 ask_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
120 data.len(),
121 PRECISION_BYTES,
122 ));
123 bid_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
124 data.len(),
125 PRECISION_BYTES,
126 ));
127 ask_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
128 data.len(),
129 PRECISION_BYTES,
130 ));
131 bid_count_builders.push(UInt32Array::builder(data.len()));
132 ask_count_builders.push(UInt32Array::builder(data.len()));
133 }
134
135 let mut flags_builder = UInt8Array::builder(data.len());
136 let mut sequence_builder = UInt64Array::builder(data.len());
137 let mut ts_event_builder = UInt64Array::builder(data.len());
138 let mut ts_init_builder = UInt64Array::builder(data.len());
139
140 for depth in data {
141 for i in 0..DEPTH10_LEN {
142 bid_price_builders[i]
143 .append_value(depth.bids[i].price.raw.to_le_bytes())
144 .unwrap();
145 ask_price_builders[i]
146 .append_value(depth.asks[i].price.raw.to_le_bytes())
147 .unwrap();
148 bid_size_builders[i]
149 .append_value(depth.bids[i].size.raw.to_le_bytes())
150 .unwrap();
151 ask_size_builders[i]
152 .append_value(depth.asks[i].size.raw.to_le_bytes())
153 .unwrap();
154 bid_count_builders[i].append_value(depth.bid_counts[i]);
155 ask_count_builders[i].append_value(depth.ask_counts[i]);
156 }
157
158 flags_builder.append_value(depth.flags);
159 sequence_builder.append_value(depth.sequence);
160 ts_event_builder.append_value(depth.ts_event.as_u64());
161 ts_init_builder.append_value(depth.ts_init.as_u64());
162 }
163
164 let bid_price_arrays = bid_price_builders
165 .into_iter()
166 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
167 .collect::<Vec<_>>();
168 let ask_price_arrays = ask_price_builders
169 .into_iter()
170 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
171 .collect::<Vec<_>>();
172 let bid_size_arrays = bid_size_builders
173 .into_iter()
174 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
175 .collect::<Vec<_>>();
176 let ask_size_arrays = ask_size_builders
177 .into_iter()
178 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
179 .collect::<Vec<_>>();
180 let bid_count_arrays = bid_count_builders
181 .into_iter()
182 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
183 .collect::<Vec<_>>();
184 let ask_count_arrays = ask_count_builders
185 .into_iter()
186 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
187 .collect::<Vec<_>>();
188
189 let flags_array = Arc::new(flags_builder.finish()) as Arc<dyn Array>;
190 let sequence_array = Arc::new(sequence_builder.finish()) as Arc<dyn Array>;
191 let ts_event_array = Arc::new(ts_event_builder.finish()) as Arc<dyn Array>;
192 let ts_init_array = Arc::new(ts_init_builder.finish()) as Arc<dyn Array>;
193
194 let mut columns = Vec::new();
195 columns.extend(bid_price_arrays);
196 columns.extend(ask_price_arrays);
197 columns.extend(bid_size_arrays);
198 columns.extend(ask_size_arrays);
199 columns.extend(bid_count_arrays);
200 columns.extend(ask_count_arrays);
201 columns.push(flags_array);
202 columns.push(sequence_array);
203 columns.push(ts_event_array);
204 columns.push(ts_init_array);
205
206 RecordBatch::try_new(Self::get_schema(Some(metadata.clone())).into(), columns)
207 }
208
209 fn metadata(&self) -> HashMap<String, String> {
210 Self::get_metadata(
211 &self.instrument_id,
212 self.bids[0].price.precision,
213 self.bids[0].size.precision,
214 )
215 }
216}
217
218impl DecodeFromRecordBatch for OrderBookDepth10 {
219 fn decode_batch(
220 metadata: &HashMap<String, String>,
221 record_batch: RecordBatch,
222 ) -> Result<Vec<Self>, EncodingError> {
223 let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
224 let cols = record_batch.columns();
225
226 let mut bid_prices = Vec::with_capacity(DEPTH10_LEN);
227 let mut ask_prices = Vec::with_capacity(DEPTH10_LEN);
228 let mut bid_sizes = Vec::with_capacity(DEPTH10_LEN);
229 let mut ask_sizes = Vec::with_capacity(DEPTH10_LEN);
230 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
231 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
232
233 macro_rules! extract_depth_column {
234 ($array:ty, $name:literal, $i:expr, $offset:expr, $type:expr) => {
235 extract_column::<$array>(cols, concat!($name, "_", stringify!($i)), $offset, $type)?
236 };
237 }
238
239 for i in 0..DEPTH10_LEN {
240 bid_prices.push(extract_depth_column!(
241 FixedSizeBinaryArray,
242 "bid_price",
243 i,
244 i,
245 DataType::FixedSizeBinary(PRECISION_BYTES)
246 ));
247 ask_prices.push(extract_depth_column!(
248 FixedSizeBinaryArray,
249 "ask_price",
250 i,
251 DEPTH10_LEN + i,
252 DataType::FixedSizeBinary(PRECISION_BYTES)
253 ));
254 bid_sizes.push(extract_depth_column!(
255 FixedSizeBinaryArray,
256 "bid_size",
257 i,
258 2 * DEPTH10_LEN + i,
259 DataType::FixedSizeBinary(PRECISION_BYTES)
260 ));
261 ask_sizes.push(extract_depth_column!(
262 FixedSizeBinaryArray,
263 "ask_size",
264 i,
265 3 * DEPTH10_LEN + i,
266 DataType::FixedSizeBinary(PRECISION_BYTES)
267 ));
268 bid_counts.push(extract_depth_column!(
269 UInt32Array,
270 "bid_count",
271 i,
272 4 * DEPTH10_LEN + i,
273 DataType::UInt32
274 ));
275 ask_counts.push(extract_depth_column!(
276 UInt32Array,
277 "ask_count",
278 i,
279 5 * DEPTH10_LEN + i,
280 DataType::UInt32
281 ));
282 }
283
284 for i in 0..DEPTH10_LEN {
285 if bid_prices[i].value_length() != PRECISION_BYTES {
286 return Err(EncodingError::ParseError(
287 "bid_price",
288 format!(
289 "Invalid value length at index {i}: expected {PRECISION_BYTES}, found {}",
290 bid_prices[i].value_length()
291 ),
292 ));
293 }
294 if ask_prices[i].value_length() != PRECISION_BYTES {
295 return Err(EncodingError::ParseError(
296 "ask_price",
297 format!(
298 "Invalid value length at index {i}: expected {PRECISION_BYTES}, found {}",
299 ask_prices[i].value_length()
300 ),
301 ));
302 }
303 if bid_sizes[i].value_length() != PRECISION_BYTES {
304 return Err(EncodingError::ParseError(
305 "bid_size",
306 format!(
307 "Invalid value length at index {i}: expected {PRECISION_BYTES}, found {}",
308 bid_sizes[i].value_length()
309 ),
310 ));
311 }
312 if ask_sizes[i].value_length() != PRECISION_BYTES {
313 return Err(EncodingError::ParseError(
314 "ask_size",
315 format!(
316 "Invalid value length at index {i}: expected {PRECISION_BYTES}, found {}",
317 ask_sizes[i].value_length()
318 ),
319 ));
320 }
321 }
322
323 let flags = extract_column::<UInt8Array>(cols, "flags", 6 * DEPTH10_LEN, DataType::UInt8)?;
324 let sequence =
325 extract_column::<UInt64Array>(cols, "sequence", 6 * DEPTH10_LEN + 1, DataType::UInt64)?;
326 let ts_event =
327 extract_column::<UInt64Array>(cols, "ts_event", 6 * DEPTH10_LEN + 2, DataType::UInt64)?;
328 let ts_init =
329 extract_column::<UInt64Array>(cols, "ts_init", 6 * DEPTH10_LEN + 3, DataType::UInt64)?;
330
331 let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
333 .map(|row| {
334 let mut bids = [BookOrder::default(); DEPTH10_LEN];
335 let mut asks = [BookOrder::default(); DEPTH10_LEN];
336 let mut bid_count_arr = [0u32; DEPTH10_LEN];
337 let mut ask_count_arr = [0u32; DEPTH10_LEN];
338
339 for i in 0..DEPTH10_LEN {
340 let bid_price =
341 decode_price(bid_prices[i].value(row), price_precision, "bid_price", row)?;
342 let bid_size =
343 decode_quantity(bid_sizes[i].value(row), size_precision, "bid_size", row)?;
344 bids[i] = BookOrder::new(OrderSide::Buy, bid_price, bid_size, 0);
345
346 let ask_price =
347 decode_price(ask_prices[i].value(row), price_precision, "ask_price", row)?;
348 let ask_size =
349 decode_quantity(ask_sizes[i].value(row), size_precision, "ask_size", row)?;
350 asks[i] = BookOrder::new(OrderSide::Sell, ask_price, ask_size, 0);
351
352 bid_count_arr[i] = bid_counts[i].value(row);
353 ask_count_arr[i] = ask_counts[i].value(row);
354 }
355
356 Ok(Self {
357 instrument_id,
358 bids,
359 asks,
360 bid_counts: bid_count_arr,
361 ask_counts: ask_count_arr,
362 flags: flags.value(row),
363 sequence: sequence.value(row),
364 ts_event: ts_event.value(row).into(),
365 ts_init: ts_init.value(row).into(),
366 })
367 })
368 .collect();
369
370 result
371 }
372}
373
374impl DecodeDataFromRecordBatch for OrderBookDepth10 {
375 fn decode_data_batch(
376 metadata: &HashMap<String, String>,
377 record_batch: RecordBatch,
378 ) -> Result<Vec<Data>, EncodingError> {
379 let depths: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
380 Ok(depths.into_iter().map(Data::from).collect())
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use arrow::datatypes::{DataType, Field};
387 use nautilus_model::{
388 data::stubs::stub_depth10,
389 types::{Price, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
390 };
391 use pretty_assertions::assert_eq;
392 use rstest::rstest;
393
394 use super::*;
395 use crate::arrow::{get_raw_price, get_raw_quantity};
396
397 #[rstest]
398 fn test_get_schema() {
399 let instrument_id = InstrumentId::from("AAPL.XNAS");
400 let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
401 let schema = OrderBookDepth10::get_schema(Some(metadata));
402
403 let mut group_count = 0;
404 let field_data = get_field_data();
405 for (name, data_type) in field_data {
406 for i in 0..DEPTH10_LEN {
407 let field = schema.field(i + group_count * DEPTH10_LEN).clone();
408 assert_eq!(
409 field,
410 Field::new(format!("{name}_{i}"), data_type.clone(), false)
411 );
412 }
413
414 group_count += 1;
415 }
416
417 let flags_field = schema.field(group_count * DEPTH10_LEN).clone();
418 assert_eq!(flags_field, Field::new("flags", DataType::UInt8, false));
419 let sequence_field = schema.field(group_count * DEPTH10_LEN + 1).clone();
420 assert_eq!(
421 sequence_field,
422 Field::new("sequence", DataType::UInt64, false)
423 );
424 let ts_event_field = schema.field(group_count * DEPTH10_LEN + 2).clone();
425 assert_eq!(
426 ts_event_field,
427 Field::new("ts_event", DataType::UInt64, false)
428 );
429 let ts_init_field = schema.field(group_count * DEPTH10_LEN + 3).clone();
430 assert_eq!(
431 ts_init_field,
432 Field::new("ts_init", DataType::UInt64, false)
433 );
434
435 assert_eq!(schema.metadata()["instrument_id"], "AAPL.XNAS");
436 assert_eq!(schema.metadata()["price_precision"], "2");
437 assert_eq!(schema.metadata()["size_precision"], "0");
438 }
439
440 #[rstest]
441 fn test_get_schema_map() {
442 let schema_map = OrderBookDepth10::get_schema_map();
443
444 let field_data = get_field_data();
445 for (name, data_type) in field_data {
446 for i in 0..DEPTH10_LEN {
447 let field = schema_map.get(&format!("{name}_{i}")).map(String::as_str);
448 assert_eq!(field, Some(format!("{data_type:?}").as_str()));
449 }
450 }
451
452 assert_eq!(schema_map.get("flags").map(String::as_str), Some("UInt8"));
453 assert_eq!(
454 schema_map.get("sequence").map(String::as_str),
455 Some("UInt64")
456 );
457 assert_eq!(
458 schema_map.get("ts_event").map(String::as_str),
459 Some("UInt64")
460 );
461 assert_eq!(
462 schema_map.get("ts_init").map(String::as_str),
463 Some("UInt64")
464 );
465 }
466
467 #[rstest]
468 fn test_encode_batch(stub_depth10: OrderBookDepth10) {
469 let instrument_id = InstrumentId::from("AAPL.XNAS");
470 let price_precision = 2;
471 let metadata = OrderBookDepth10::get_metadata(&instrument_id, price_precision, 0);
472
473 let data = vec![stub_depth10];
474 let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
475 let columns = record_batch.columns();
476
477 assert_eq!(columns.len(), DEPTH10_LEN * 6 + 4);
478
479 let bid_prices: Vec<_> = (0..DEPTH10_LEN)
481 .map(|i| {
482 columns[i]
483 .as_any()
484 .downcast_ref::<FixedSizeBinaryArray>()
485 .unwrap()
486 })
487 .collect();
488
489 let expected_bid_prices: Vec<f64> =
490 vec![99.0, 98.0, 97.0, 96.0, 95.0, 94.0, 93.0, 92.0, 91.0, 90.0];
491
492 for (i, bid_price) in bid_prices.iter().enumerate() {
493 assert_eq!(bid_price.len(), 1);
494 assert_eq!(
495 get_raw_price(bid_price.value(0)),
496 (expected_bid_prices[i] * FIXED_SCALAR) as PriceRaw
497 );
498 assert_eq!(
499 Price::from_raw(get_raw_price(bid_price.value(0)), price_precision).as_f64(),
500 expected_bid_prices[i]
501 );
502 }
503
504 let ask_prices: Vec<_> = (0..DEPTH10_LEN)
506 .map(|i| {
507 columns[DEPTH10_LEN + i]
508 .as_any()
509 .downcast_ref::<FixedSizeBinaryArray>()
510 .unwrap()
511 })
512 .collect();
513
514 let expected_ask_prices: Vec<f64> = vec![
515 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0,
516 ];
517
518 for (i, ask_price) in ask_prices.iter().enumerate() {
519 assert_eq!(ask_price.len(), 1);
520 assert_eq!(
521 get_raw_price(ask_price.value(0)),
522 (expected_ask_prices[i] * FIXED_SCALAR) as PriceRaw
523 );
524 assert_eq!(
525 Price::from_raw(get_raw_price(ask_price.value(0)), price_precision).as_f64(),
526 expected_ask_prices[i]
527 );
528 }
529
530 let bid_sizes: Vec<_> = (0..DEPTH10_LEN)
532 .map(|i| {
533 columns[2 * DEPTH10_LEN + i]
534 .as_any()
535 .downcast_ref::<FixedSizeBinaryArray>()
536 .unwrap()
537 })
538 .collect();
539
540 for (i, bid_size) in bid_sizes.iter().enumerate() {
541 assert_eq!(bid_size.len(), 1);
542 assert_eq!(
543 get_raw_quantity(bid_size.value(0)),
544 ((100.0 * FIXED_SCALAR * (i + 1) as f64) as QuantityRaw)
545 );
546 }
547
548 let ask_sizes: Vec<_> = (0..DEPTH10_LEN)
550 .map(|i| {
551 columns[3 * DEPTH10_LEN + i]
552 .as_any()
553 .downcast_ref::<FixedSizeBinaryArray>()
554 .unwrap()
555 })
556 .collect();
557
558 for (i, ask_size) in ask_sizes.iter().enumerate() {
559 assert_eq!(ask_size.len(), 1);
560 assert_eq!(
561 get_raw_quantity(ask_size.value(0)),
562 ((100.0 * FIXED_SCALAR * ((i + 1) as f64)) as QuantityRaw)
563 );
564 }
565
566 let bid_counts: Vec<_> = (0..DEPTH10_LEN)
568 .map(|i| {
569 columns[4 * DEPTH10_LEN + i]
570 .as_any()
571 .downcast_ref::<UInt32Array>()
572 .unwrap()
573 })
574 .collect();
575
576 for count_values in bid_counts {
577 assert_eq!(count_values.len(), 1);
578 assert_eq!(count_values.value(0), 1);
579 }
580
581 let ask_counts: Vec<_> = (0..DEPTH10_LEN)
583 .map(|i| {
584 columns[5 * DEPTH10_LEN + i]
585 .as_any()
586 .downcast_ref::<UInt32Array>()
587 .unwrap()
588 })
589 .collect();
590
591 for count_values in ask_counts {
592 assert_eq!(count_values.len(), 1);
593 assert_eq!(count_values.value(0), 1);
594 }
595
596 let flags_values = columns[6 * DEPTH10_LEN]
598 .as_any()
599 .downcast_ref::<UInt8Array>()
600 .unwrap();
601 let sequence_values = columns[6 * DEPTH10_LEN + 1]
602 .as_any()
603 .downcast_ref::<UInt64Array>()
604 .unwrap();
605 let ts_event_values = columns[6 * DEPTH10_LEN + 2]
606 .as_any()
607 .downcast_ref::<UInt64Array>()
608 .unwrap();
609 let ts_init_values = columns[6 * DEPTH10_LEN + 3]
610 .as_any()
611 .downcast_ref::<UInt64Array>()
612 .unwrap();
613
614 assert_eq!(flags_values.len(), 1);
615 assert_eq!(flags_values.value(0), 0);
616 assert_eq!(sequence_values.len(), 1);
617 assert_eq!(sequence_values.value(0), 0);
618 assert_eq!(ts_event_values.len(), 1);
619 assert_eq!(ts_event_values.value(0), 1);
620 assert_eq!(ts_init_values.len(), 1);
621 assert_eq!(ts_init_values.value(0), 2);
622 }
623
624 #[rstest]
625 fn test_decode_batch(stub_depth10: OrderBookDepth10) {
626 let instrument_id = InstrumentId::from("AAPL.XNAS");
627 let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
628
629 let data = vec![stub_depth10];
630 let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
631 let decoded_data = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
632
633 assert_eq!(decoded_data.len(), 1);
634 }
635
636 #[rstest]
637 fn test_decode_batch_missing_instrument_id_returns_error(stub_depth10: OrderBookDepth10) {
638 let instrument_id = InstrumentId::from("AAPL.XNAS");
639 let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
640 let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
641
642 metadata.remove(KEY_INSTRUMENT_ID);
643
644 let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
645 assert!(result.is_err());
646 let err = result.unwrap_err();
647 assert!(
648 err.to_string().contains("instrument_id"),
649 "Expected missing instrument_id error, got: {err}"
650 );
651 }
652
653 #[rstest]
654 fn test_decode_batch_missing_price_precision_returns_error(stub_depth10: OrderBookDepth10) {
655 let instrument_id = InstrumentId::from("AAPL.XNAS");
656 let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
657 let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
658
659 metadata.remove(KEY_PRICE_PRECISION);
660
661 let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
662 assert!(result.is_err());
663 let err = result.unwrap_err();
664 assert!(
665 err.to_string().contains("price_precision"),
666 "Expected missing price_precision error, got: {err}"
667 );
668 }
669
670 #[rstest]
671 fn test_encode_decode_round_trip(stub_depth10: OrderBookDepth10) {
672 let instrument_id = InstrumentId::from("AAPL.XNAS");
673 let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
674
675 let original = vec![stub_depth10];
676 let record_batch = OrderBookDepth10::encode_batch(&metadata, &original).unwrap();
677 let decoded = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
678
679 assert_eq!(decoded.len(), original.len());
680 let orig = &original[0];
681 let dec = &decoded[0];
682
683 assert_eq!(dec.instrument_id, orig.instrument_id);
684 assert_eq!(dec.flags, orig.flags);
685 assert_eq!(dec.sequence, orig.sequence);
686 assert_eq!(dec.ts_event, orig.ts_event);
687 assert_eq!(dec.ts_init, orig.ts_init);
688
689 for i in 0..DEPTH10_LEN {
690 assert_eq!(
691 dec.bids[i].price, orig.bids[i].price,
692 "bid price mismatch at level {i}"
693 );
694 assert_eq!(
695 dec.bids[i].size, orig.bids[i].size,
696 "bid size mismatch at level {i}"
697 );
698 assert_eq!(
699 dec.asks[i].price, orig.asks[i].price,
700 "ask price mismatch at level {i}"
701 );
702 assert_eq!(
703 dec.asks[i].size, orig.asks[i].size,
704 "ask size mismatch at level {i}"
705 );
706 }
707 }
708}