1use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19 array::{FixedSizeBinaryArray, FixedSizeBinaryBuilder, UInt8Array, UInt64Array},
20 datatypes::{DataType, Field, Schema},
21 error::ArrowError,
22 record_batch::RecordBatch,
23};
24use nautilus_model::{
25 data::{BookOrder, OrderBookDelta},
26 enums::{BookAction, FromU8, OrderSide},
27 identifiers::InstrumentId,
28 types::fixed::PRECISION_BYTES,
29};
30
31use super::{
32 DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
33 KEY_SIZE_PRECISION, decode_price_with_sentinel, decode_quantity_with_sentinel, extract_column,
34 validate_precision_bytes,
35};
36use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
37
38impl ArrowSchemaProvider for OrderBookDelta {
39 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
40 let fields = vec![
41 Field::new("action", DataType::UInt8, false),
42 Field::new("side", DataType::UInt8, false),
43 Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
44 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
45 Field::new("order_id", DataType::UInt64, false),
46 Field::new("flags", DataType::UInt8, false),
47 Field::new("sequence", DataType::UInt64, false),
48 Field::new("ts_event", DataType::UInt64, false),
49 Field::new("ts_init", DataType::UInt64, false),
50 ];
51
52 match metadata {
53 Some(metadata) => Schema::new_with_metadata(fields, metadata),
54 None => Schema::new(fields),
55 }
56 }
57}
58
59fn parse_metadata(
60 metadata: &HashMap<String, String>,
61) -> Result<(InstrumentId, u8, u8), EncodingError> {
62 let instrument_id_str = metadata
63 .get(KEY_INSTRUMENT_ID)
64 .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
65 let instrument_id = InstrumentId::from_str(instrument_id_str)
66 .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
67
68 let price_precision = metadata
69 .get(KEY_PRICE_PRECISION)
70 .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
71 .parse::<u8>()
72 .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
73
74 let size_precision = metadata
75 .get(KEY_SIZE_PRECISION)
76 .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
77 .parse::<u8>()
78 .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
79
80 Ok((instrument_id, price_precision, size_precision))
81}
82
83impl EncodeToRecordBatch for OrderBookDelta {
84 fn encode_batch(
85 metadata: &HashMap<String, String>,
86 data: &[Self],
87 ) -> Result<RecordBatch, ArrowError> {
88 let mut action_builder = UInt8Array::builder(data.len());
89 let mut side_builder = UInt8Array::builder(data.len());
90 let mut price_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
91 let mut size_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
92 let mut order_id_builder = UInt64Array::builder(data.len());
93 let mut flags_builder = UInt8Array::builder(data.len());
94 let mut sequence_builder = UInt64Array::builder(data.len());
95 let mut ts_event_builder = UInt64Array::builder(data.len());
96 let mut ts_init_builder = UInt64Array::builder(data.len());
97
98 for delta in data {
99 action_builder.append_value(delta.action as u8);
100 side_builder.append_value(delta.order.side as u8);
101 price_builder
102 .append_value(delta.order.price.raw.to_le_bytes())
103 .unwrap();
104 size_builder
105 .append_value(delta.order.size.raw.to_le_bytes())
106 .unwrap();
107 order_id_builder.append_value(delta.order.order_id);
108 flags_builder.append_value(delta.flags);
109 sequence_builder.append_value(delta.sequence);
110 ts_event_builder.append_value(delta.ts_event.as_u64());
111 ts_init_builder.append_value(delta.ts_init.as_u64());
112 }
113
114 let action_array = action_builder.finish();
115 let side_array = side_builder.finish();
116 let price_array = price_builder.finish();
117 let size_array = size_builder.finish();
118 let order_id_array = order_id_builder.finish();
119 let flags_array = flags_builder.finish();
120 let sequence_array = sequence_builder.finish();
121 let ts_event_array = ts_event_builder.finish();
122 let ts_init_array = ts_init_builder.finish();
123
124 RecordBatch::try_new(
125 Self::get_schema(Some(metadata.clone())).into(),
126 vec![
127 Arc::new(action_array),
128 Arc::new(side_array),
129 Arc::new(price_array),
130 Arc::new(size_array),
131 Arc::new(order_id_array),
132 Arc::new(flags_array),
133 Arc::new(sequence_array),
134 Arc::new(ts_event_array),
135 Arc::new(ts_init_array),
136 ],
137 )
138 }
139
140 fn metadata(&self) -> HashMap<String, String> {
141 Self::get_metadata(
142 &self.instrument_id,
143 self.order.price.precision,
144 self.order.size.precision,
145 )
146 }
147
148 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
152 let delta = chunk
153 .first()
154 .expect("Chunk should have at least one element to encode");
155
156 if delta.order.price.precision == 0
157 && delta.order.size.precision == 0
158 && let Some(delta) = chunk.get(1)
159 {
160 return EncodeToRecordBatch::metadata(delta);
161 }
162
163 EncodeToRecordBatch::metadata(delta)
164 }
165}
166
167impl DecodeFromRecordBatch for OrderBookDelta {
168 fn decode_batch(
169 metadata: &HashMap<String, String>,
170 record_batch: RecordBatch,
171 ) -> Result<Vec<Self>, EncodingError> {
172 let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
173 let cols = record_batch.columns();
174
175 let action_values = extract_column::<UInt8Array>(cols, "action", 0, DataType::UInt8)?;
176 let side_values = extract_column::<UInt8Array>(cols, "side", 1, DataType::UInt8)?;
177 let price_values = extract_column::<FixedSizeBinaryArray>(
178 cols,
179 "price",
180 2,
181 DataType::FixedSizeBinary(PRECISION_BYTES),
182 )?;
183 let size_values = extract_column::<FixedSizeBinaryArray>(
184 cols,
185 "size",
186 3,
187 DataType::FixedSizeBinary(PRECISION_BYTES),
188 )?;
189 let order_id_values = extract_column::<UInt64Array>(cols, "order_id", 4, DataType::UInt64)?;
190 let flags_values = extract_column::<UInt8Array>(cols, "flags", 5, DataType::UInt8)?;
191 let sequence_values = extract_column::<UInt64Array>(cols, "sequence", 6, DataType::UInt64)?;
192 let ts_event_values = extract_column::<UInt64Array>(cols, "ts_event", 7, DataType::UInt64)?;
193 let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 8, DataType::UInt64)?;
194
195 validate_precision_bytes(price_values, "price")?;
196 validate_precision_bytes(size_values, "size")?;
197
198 let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
199 .map(|i| {
200 let action_value = action_values.value(i);
201 let action = BookAction::from_u8(action_value).ok_or_else(|| {
202 EncodingError::ParseError(
203 stringify!(BookAction),
204 format!("Invalid enum value, was {action_value}"),
205 )
206 })?;
207 let side_value = side_values.value(i);
208 let side = OrderSide::from_u8(side_value).ok_or_else(|| {
209 EncodingError::ParseError(
210 stringify!(OrderSide),
211 format!("Invalid enum value, was {side_value}"),
212 )
213 })?;
214 let price =
215 decode_price_with_sentinel(price_values.value(i), price_precision, "price", i)?;
216 let size =
217 decode_quantity_with_sentinel(size_values.value(i), size_precision, "size", i)?;
218 let order_id = order_id_values.value(i);
219 let flags = flags_values.value(i);
220 let sequence = sequence_values.value(i);
221 let ts_event = ts_event_values.value(i).into();
222 let ts_init = ts_init_values.value(i).into();
223
224 Ok(Self {
225 instrument_id,
226 action,
227 order: BookOrder {
228 side,
229 price,
230 size,
231 order_id,
232 },
233 flags,
234 sequence,
235 ts_event,
236 ts_init,
237 })
238 })
239 .collect();
240
241 result
242 }
243}
244
245impl DecodeDataFromRecordBatch for OrderBookDelta {
246 fn decode_data_batch(
247 metadata: &HashMap<String, String>,
248 record_batch: RecordBatch,
249 ) -> Result<Vec<Data>, EncodingError> {
250 let deltas: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
251 Ok(deltas.into_iter().map(Data::from).collect())
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use std::sync::Arc;
258
259 use arrow::{array::Array, record_batch::RecordBatch};
260 use nautilus_model::types::{
261 Price, Quantity,
262 fixed::FIXED_SCALAR,
263 price::{PRICE_UNDEF, PriceRaw},
264 quantity::{QUANTITY_UNDEF, QuantityRaw},
265 };
266 use pretty_assertions::assert_eq;
267 use rstest::rstest;
268
269 use super::*;
270 use crate::arrow::get_raw_price;
271
272 #[rstest]
273 fn test_get_schema() {
274 let instrument_id = InstrumentId::from("AAPL.XNAS");
275 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
276 let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
277
278 let expected_fields = vec![
279 Field::new("action", DataType::UInt8, false),
280 Field::new("side", DataType::UInt8, false),
281 Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
282 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
283 Field::new("order_id", DataType::UInt64, false),
284 Field::new("flags", DataType::UInt8, false),
285 Field::new("sequence", DataType::UInt64, false),
286 Field::new("ts_event", DataType::UInt64, false),
287 Field::new("ts_init", DataType::UInt64, false),
288 ];
289
290 let expected_schema = Schema::new_with_metadata(expected_fields, metadata);
291 assert_eq!(schema, expected_schema);
292 }
293
294 #[rstest]
295 fn test_get_schema_map() {
296 let schema_map = OrderBookDelta::get_schema_map();
297 let fixed_size_binary = format!("FixedSizeBinary({PRECISION_BYTES})");
298
299 assert_eq!(schema_map.get("action").unwrap(), "UInt8");
300 assert_eq!(schema_map.get("side").unwrap(), "UInt8");
301 assert_eq!(*schema_map.get("price").unwrap(), fixed_size_binary);
302 assert_eq!(*schema_map.get("size").unwrap(), fixed_size_binary);
303 assert_eq!(schema_map.get("order_id").unwrap(), "UInt64");
304 assert_eq!(schema_map.get("flags").unwrap(), "UInt8");
305 assert_eq!(schema_map.get("sequence").unwrap(), "UInt64");
306 assert_eq!(schema_map.get("ts_event").unwrap(), "UInt64");
307 assert_eq!(schema_map.get("ts_init").unwrap(), "UInt64");
308 }
309
310 #[rstest]
311 fn test_encode_batch() {
312 let instrument_id = InstrumentId::from("AAPL.XNAS");
313 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
314
315 let delta1 = OrderBookDelta {
316 instrument_id,
317 action: BookAction::Add,
318 order: BookOrder {
319 side: OrderSide::Buy,
320 price: Price::from("100.10"),
321 size: Quantity::from(100),
322 order_id: 1,
323 },
324 flags: 0,
325 sequence: 1,
326 ts_event: 1.into(),
327 ts_init: 3.into(),
328 };
329
330 let delta2 = OrderBookDelta {
331 instrument_id,
332 action: BookAction::Update,
333 order: BookOrder {
334 side: OrderSide::Sell,
335 price: Price::from("101.20"),
336 size: Quantity::from(200),
337 order_id: 2,
338 },
339 flags: 1,
340 sequence: 2,
341 ts_event: 2.into(),
342 ts_init: 4.into(),
343 };
344
345 let data = vec![delta1, delta2];
346 let record_batch = OrderBookDelta::encode_batch(&metadata, &data).unwrap();
347
348 let columns = record_batch.columns();
349 let action_values = columns[0].as_any().downcast_ref::<UInt8Array>().unwrap();
350 let side_values = columns[1].as_any().downcast_ref::<UInt8Array>().unwrap();
351 let price_values = columns[2]
352 .as_any()
353 .downcast_ref::<FixedSizeBinaryArray>()
354 .unwrap();
355 let size_values = columns[3]
356 .as_any()
357 .downcast_ref::<FixedSizeBinaryArray>()
358 .unwrap();
359 let order_id_values = columns[4].as_any().downcast_ref::<UInt64Array>().unwrap();
360 let flags_values = columns[5].as_any().downcast_ref::<UInt8Array>().unwrap();
361 let sequence_values = columns[6].as_any().downcast_ref::<UInt64Array>().unwrap();
362 let ts_event_values = columns[7].as_any().downcast_ref::<UInt64Array>().unwrap();
363 let ts_init_values = columns[8].as_any().downcast_ref::<UInt64Array>().unwrap();
364
365 assert_eq!(columns.len(), 9);
366 assert_eq!(action_values.len(), 2);
367 assert_eq!(action_values.value(0), 1);
368 assert_eq!(action_values.value(1), 2);
369 assert_eq!(side_values.len(), 2);
370 assert_eq!(side_values.value(0), 1);
371 assert_eq!(side_values.value(1), 2);
372
373 assert_eq!(price_values.len(), 2);
374 assert_eq!(
375 get_raw_price(price_values.value(0)),
376 (100.10 * FIXED_SCALAR) as PriceRaw
377 );
378 assert_eq!(
379 get_raw_price(price_values.value(1)),
380 (101.20 * FIXED_SCALAR) as PriceRaw
381 );
382
383 assert_eq!(size_values.len(), 2);
384 assert_eq!(
385 get_raw_price(size_values.value(0)),
386 (100.0 * FIXED_SCALAR) as PriceRaw
387 );
388 assert_eq!(
389 get_raw_price(size_values.value(1)),
390 (200.0 * FIXED_SCALAR) as PriceRaw
391 );
392 assert_eq!(order_id_values.len(), 2);
393 assert_eq!(order_id_values.value(0), 1);
394 assert_eq!(order_id_values.value(1), 2);
395 assert_eq!(flags_values.len(), 2);
396 assert_eq!(flags_values.value(0), 0);
397 assert_eq!(flags_values.value(1), 1);
398 assert_eq!(sequence_values.len(), 2);
399 assert_eq!(sequence_values.value(0), 1);
400 assert_eq!(sequence_values.value(1), 2);
401 assert_eq!(ts_event_values.len(), 2);
402 assert_eq!(ts_event_values.value(0), 1);
403 assert_eq!(ts_event_values.value(1), 2);
404 assert_eq!(ts_init_values.len(), 2);
405 assert_eq!(ts_init_values.value(0), 3);
406 assert_eq!(ts_init_values.value(1), 4);
407 }
408
409 #[rstest]
410 fn test_decode_batch() {
411 let instrument_id = InstrumentId::from("AAPL.XNAS");
412 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
413
414 let action = UInt8Array::from(vec![1, 2]);
415 let side = UInt8Array::from(vec![1, 1]);
416 let price = FixedSizeBinaryArray::from(vec![
417 &((101.10 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
418 &((101.20 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
419 ]);
420 let size = FixedSizeBinaryArray::from(vec![
421 &((10000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
422 &((9000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
423 ]);
424 let order_id = UInt64Array::from(vec![1, 2]);
425 let flags = UInt8Array::from(vec![0, 0]);
426 let sequence = UInt64Array::from(vec![1, 2]);
427 let ts_event = UInt64Array::from(vec![1, 2]);
428 let ts_init = UInt64Array::from(vec![3, 4]);
429
430 let record_batch = RecordBatch::try_new(
431 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
432 vec![
433 Arc::new(action),
434 Arc::new(side),
435 Arc::new(price),
436 Arc::new(size),
437 Arc::new(order_id),
438 Arc::new(flags),
439 Arc::new(sequence),
440 Arc::new(ts_event),
441 Arc::new(ts_init),
442 ],
443 )
444 .unwrap();
445
446 let decoded_data = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
447 assert_eq!(decoded_data.len(), 2);
448 }
449
450 #[rstest]
451 fn test_decode_batch_with_undef_values() {
452 let instrument_id = InstrumentId::from("PLTR.XNAS");
453 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
454
455 let action = UInt8Array::from(vec![4, 1]); let side = UInt8Array::from(vec![0, 1]); let price = FixedSizeBinaryArray::from(vec![
459 &PRICE_UNDEF.to_le_bytes(),
460 &((100.50 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
461 ]);
462 let size = FixedSizeBinaryArray::from(vec![
463 &QUANTITY_UNDEF.to_le_bytes(),
464 &((1000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
465 ]);
466 let order_id = UInt64Array::from(vec![0, 1]);
467 let flags = UInt8Array::from(vec![0, 0]);
468 let sequence = UInt64Array::from(vec![1, 2]);
469 let ts_event = UInt64Array::from(vec![1, 2]);
470 let ts_init = UInt64Array::from(vec![3, 4]);
471
472 let record_batch = RecordBatch::try_new(
473 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
474 vec![
475 Arc::new(action),
476 Arc::new(side),
477 Arc::new(price),
478 Arc::new(size),
479 Arc::new(order_id),
480 Arc::new(flags),
481 Arc::new(sequence),
482 Arc::new(ts_event),
483 Arc::new(ts_init),
484 ],
485 )
486 .unwrap();
487
488 let decoded_data = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
489 assert_eq!(decoded_data.len(), 2);
490 assert_eq!(decoded_data[0].order.price.raw, PRICE_UNDEF);
491 assert_eq!(decoded_data[0].order.price.precision, 0);
492 assert_eq!(decoded_data[0].order.size.raw, QUANTITY_UNDEF);
493 assert_eq!(decoded_data[0].order.size.precision, 0);
494 assert_eq!(decoded_data[1].order.price.precision, 2);
495 assert_eq!(decoded_data[1].order.size.precision, 0);
496 }
497
498 #[rstest]
499 fn test_decode_batch_invalid_price_returns_error() {
500 let instrument_id = InstrumentId::from("AAPL.XNAS");
501 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
502
503 let action = UInt8Array::from(vec![1]);
504 let side = UInt8Array::from(vec![1]);
505
506 let invalid_price: PriceRaw = PriceRaw::MAX - 1000;
507 let price = FixedSizeBinaryArray::from(vec![&invalid_price.to_le_bytes()]);
508 let size = FixedSizeBinaryArray::from(vec![
509 &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
510 ]);
511 let order_id = UInt64Array::from(vec![1]);
512 let flags = UInt8Array::from(vec![0]);
513 let sequence = UInt64Array::from(vec![1]);
514 let ts_event = UInt64Array::from(vec![1]);
515 let ts_init = UInt64Array::from(vec![2]);
516
517 let record_batch = RecordBatch::try_new(
518 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
519 vec![
520 Arc::new(action),
521 Arc::new(side),
522 Arc::new(price),
523 Arc::new(size),
524 Arc::new(order_id),
525 Arc::new(flags),
526 Arc::new(sequence),
527 Arc::new(ts_event),
528 Arc::new(ts_init),
529 ],
530 )
531 .unwrap();
532
533 let result = OrderBookDelta::decode_batch(&metadata, record_batch);
534 assert!(result.is_err());
535 let err = result.unwrap_err();
536 assert!(
537 err.to_string().contains("price") && err.to_string().contains("row 0"),
538 "Expected price error at row 0, was: {err}"
539 );
540 }
541
542 #[rstest]
543 fn test_decode_batch_invalid_action_returns_error() {
544 let instrument_id = InstrumentId::from("AAPL.XNAS");
545 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
546
547 let action = UInt8Array::from(vec![99]);
548 let side = UInt8Array::from(vec![1]);
549 let price =
550 FixedSizeBinaryArray::from(vec![&((100.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes()]);
551 let size = FixedSizeBinaryArray::from(vec![
552 &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
553 ]);
554 let order_id = UInt64Array::from(vec![1]);
555 let flags = UInt8Array::from(vec![0]);
556 let sequence = UInt64Array::from(vec![1]);
557 let ts_event = UInt64Array::from(vec![1]);
558 let ts_init = UInt64Array::from(vec![2]);
559
560 let record_batch = RecordBatch::try_new(
561 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
562 vec![
563 Arc::new(action),
564 Arc::new(side),
565 Arc::new(price),
566 Arc::new(size),
567 Arc::new(order_id),
568 Arc::new(flags),
569 Arc::new(sequence),
570 Arc::new(ts_event),
571 Arc::new(ts_init),
572 ],
573 )
574 .unwrap();
575
576 let result = OrderBookDelta::decode_batch(&metadata, record_batch);
577 assert!(result.is_err());
578 let err = result.unwrap_err();
579 assert!(
580 err.to_string().contains("BookAction"),
581 "Expected BookAction error, was: {err}"
582 );
583 }
584
585 #[rstest]
586 fn test_decode_batch_missing_instrument_id_returns_error() {
587 let instrument_id = InstrumentId::from("AAPL.XNAS");
588 let mut metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
589 metadata.remove(KEY_INSTRUMENT_ID);
590
591 let action = UInt8Array::from(vec![1]);
592 let side = UInt8Array::from(vec![1]);
593 let price =
594 FixedSizeBinaryArray::from(vec![&((100.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes()]);
595 let size = FixedSizeBinaryArray::from(vec![
596 &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
597 ]);
598 let order_id = UInt64Array::from(vec![1]);
599 let flags = UInt8Array::from(vec![0]);
600 let sequence = UInt64Array::from(vec![1]);
601 let ts_event = UInt64Array::from(vec![1]);
602 let ts_init = UInt64Array::from(vec![2]);
603
604 let record_batch = RecordBatch::try_new(
605 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
606 vec![
607 Arc::new(action),
608 Arc::new(side),
609 Arc::new(price),
610 Arc::new(size),
611 Arc::new(order_id),
612 Arc::new(flags),
613 Arc::new(sequence),
614 Arc::new(ts_event),
615 Arc::new(ts_init),
616 ],
617 )
618 .unwrap();
619
620 let result = OrderBookDelta::decode_batch(&metadata, record_batch);
621 assert!(result.is_err());
622 let err = result.unwrap_err();
623 assert!(
624 err.to_string().contains("instrument_id"),
625 "Expected missing instrument_id error, was: {err}"
626 );
627 }
628
629 #[rstest]
630 fn test_encode_decode_round_trip() {
631 let instrument_id = InstrumentId::from("AAPL.XNAS");
632 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
633
634 let delta1 = OrderBookDelta {
635 instrument_id,
636 action: BookAction::Add,
637 order: BookOrder {
638 side: OrderSide::Buy,
639 price: Price::from("100.10"),
640 size: Quantity::from(100),
641 order_id: 1,
642 },
643 flags: 0,
644 sequence: 1,
645 ts_event: 1_000_000_000.into(),
646 ts_init: 1_000_000_001.into(),
647 };
648
649 let delta2 = OrderBookDelta {
650 instrument_id,
651 action: BookAction::Update,
652 order: BookOrder {
653 side: OrderSide::Sell,
654 price: Price::from("101.20"),
655 size: Quantity::from(200),
656 order_id: 2,
657 },
658 flags: 1,
659 sequence: 2,
660 ts_event: 2_000_000_000.into(),
661 ts_init: 2_000_000_001.into(),
662 };
663
664 let original = vec![delta1, delta2];
665 let record_batch = OrderBookDelta::encode_batch(&metadata, &original).unwrap();
666 let decoded = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
667
668 assert_eq!(decoded.len(), original.len());
669 for (orig, dec) in original.iter().zip(decoded.iter()) {
670 assert_eq!(dec.instrument_id, orig.instrument_id);
671 assert_eq!(dec.action, orig.action);
672 assert_eq!(dec.order.side, orig.order.side);
673 assert_eq!(dec.order.price, orig.order.price);
674 assert_eq!(dec.order.size, orig.order.size);
675 assert_eq!(dec.order.order_id, orig.order.order_id);
676 assert_eq!(dec.flags, orig.flags);
677 assert_eq!(dec.sequence, orig.sequence);
678 assert_eq!(dec.ts_event, orig.ts_event);
679 assert_eq!(dec.ts_init, orig.ts_init);
680 }
681 }
682}