Skip to main content

nautilus_serialization/arrow/
delta.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19    array::{FixedSizeBinaryArray, FixedSizeBinaryBuilder, UInt8Array, UInt64Array},
20    datatypes::{DataType, Field, Schema},
21    error::ArrowError,
22    record_batch::RecordBatch,
23};
24use nautilus_model::{
25    data::{BookOrder, OrderBookDelta},
26    enums::{BookAction, FromU8, OrderSide},
27    identifiers::InstrumentId,
28    types::fixed::PRECISION_BYTES,
29};
30
31use super::{
32    DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
33    KEY_SIZE_PRECISION, decode_price_with_sentinel, decode_quantity_with_sentinel, extract_column,
34    validate_precision_bytes,
35};
36use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
37
38impl ArrowSchemaProvider for OrderBookDelta {
39    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
40        let fields = vec![
41            Field::new("action", DataType::UInt8, false),
42            Field::new("side", DataType::UInt8, false),
43            Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
44            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
45            Field::new("order_id", DataType::UInt64, false),
46            Field::new("flags", DataType::UInt8, false),
47            Field::new("sequence", DataType::UInt64, false),
48            Field::new("ts_event", DataType::UInt64, false),
49            Field::new("ts_init", DataType::UInt64, false),
50        ];
51
52        match metadata {
53            Some(metadata) => Schema::new_with_metadata(fields, metadata),
54            None => Schema::new(fields),
55        }
56    }
57}
58
59fn parse_metadata(
60    metadata: &HashMap<String, String>,
61) -> Result<(InstrumentId, u8, u8), EncodingError> {
62    let instrument_id_str = metadata
63        .get(KEY_INSTRUMENT_ID)
64        .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
65    let instrument_id = InstrumentId::from_str(instrument_id_str)
66        .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
67
68    let price_precision = metadata
69        .get(KEY_PRICE_PRECISION)
70        .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
71        .parse::<u8>()
72        .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
73
74    let size_precision = metadata
75        .get(KEY_SIZE_PRECISION)
76        .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
77        .parse::<u8>()
78        .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
79
80    Ok((instrument_id, price_precision, size_precision))
81}
82
83impl EncodeToRecordBatch for OrderBookDelta {
84    fn encode_batch(
85        metadata: &HashMap<String, String>,
86        data: &[Self],
87    ) -> Result<RecordBatch, ArrowError> {
88        let mut action_builder = UInt8Array::builder(data.len());
89        let mut side_builder = UInt8Array::builder(data.len());
90        let mut price_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
91        let mut size_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
92        let mut order_id_builder = UInt64Array::builder(data.len());
93        let mut flags_builder = UInt8Array::builder(data.len());
94        let mut sequence_builder = UInt64Array::builder(data.len());
95        let mut ts_event_builder = UInt64Array::builder(data.len());
96        let mut ts_init_builder = UInt64Array::builder(data.len());
97
98        for delta in data {
99            action_builder.append_value(delta.action as u8);
100            side_builder.append_value(delta.order.side as u8);
101            price_builder
102                .append_value(delta.order.price.raw.to_le_bytes())
103                .unwrap();
104            size_builder
105                .append_value(delta.order.size.raw.to_le_bytes())
106                .unwrap();
107            order_id_builder.append_value(delta.order.order_id);
108            flags_builder.append_value(delta.flags);
109            sequence_builder.append_value(delta.sequence);
110            ts_event_builder.append_value(delta.ts_event.as_u64());
111            ts_init_builder.append_value(delta.ts_init.as_u64());
112        }
113
114        let action_array = action_builder.finish();
115        let side_array = side_builder.finish();
116        let price_array = price_builder.finish();
117        let size_array = size_builder.finish();
118        let order_id_array = order_id_builder.finish();
119        let flags_array = flags_builder.finish();
120        let sequence_array = sequence_builder.finish();
121        let ts_event_array = ts_event_builder.finish();
122        let ts_init_array = ts_init_builder.finish();
123
124        RecordBatch::try_new(
125            Self::get_schema(Some(metadata.clone())).into(),
126            vec![
127                Arc::new(action_array),
128                Arc::new(side_array),
129                Arc::new(price_array),
130                Arc::new(size_array),
131                Arc::new(order_id_array),
132                Arc::new(flags_array),
133                Arc::new(sequence_array),
134                Arc::new(ts_event_array),
135                Arc::new(ts_init_array),
136            ],
137        )
138    }
139
140    fn metadata(&self) -> HashMap<String, String> {
141        Self::get_metadata(
142            &self.instrument_id,
143            self.order.price.precision,
144            self.order.size.precision,
145        )
146    }
147
148    /// Extract metadata from first two deltas
149    ///
150    /// Use the second delta if the first one has 0 precision
151    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
152        let delta = chunk
153            .first()
154            .expect("Chunk should have at least one element to encode");
155
156        if delta.order.price.precision == 0
157            && delta.order.size.precision == 0
158            && let Some(delta) = chunk.get(1)
159        {
160            return EncodeToRecordBatch::metadata(delta);
161        }
162
163        EncodeToRecordBatch::metadata(delta)
164    }
165}
166
167impl DecodeFromRecordBatch for OrderBookDelta {
168    fn decode_batch(
169        metadata: &HashMap<String, String>,
170        record_batch: RecordBatch,
171    ) -> Result<Vec<Self>, EncodingError> {
172        let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
173        let cols = record_batch.columns();
174
175        let action_values = extract_column::<UInt8Array>(cols, "action", 0, DataType::UInt8)?;
176        let side_values = extract_column::<UInt8Array>(cols, "side", 1, DataType::UInt8)?;
177        let price_values = extract_column::<FixedSizeBinaryArray>(
178            cols,
179            "price",
180            2,
181            DataType::FixedSizeBinary(PRECISION_BYTES),
182        )?;
183        let size_values = extract_column::<FixedSizeBinaryArray>(
184            cols,
185            "size",
186            3,
187            DataType::FixedSizeBinary(PRECISION_BYTES),
188        )?;
189        let order_id_values = extract_column::<UInt64Array>(cols, "order_id", 4, DataType::UInt64)?;
190        let flags_values = extract_column::<UInt8Array>(cols, "flags", 5, DataType::UInt8)?;
191        let sequence_values = extract_column::<UInt64Array>(cols, "sequence", 6, DataType::UInt64)?;
192        let ts_event_values = extract_column::<UInt64Array>(cols, "ts_event", 7, DataType::UInt64)?;
193        let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 8, DataType::UInt64)?;
194
195        validate_precision_bytes(price_values, "price")?;
196        validate_precision_bytes(size_values, "size")?;
197
198        let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
199            .map(|i| {
200                let action_value = action_values.value(i);
201                let action = BookAction::from_u8(action_value).ok_or_else(|| {
202                    EncodingError::ParseError(
203                        stringify!(BookAction),
204                        format!("Invalid enum value, was {action_value}"),
205                    )
206                })?;
207                let side_value = side_values.value(i);
208                let side = OrderSide::from_u8(side_value).ok_or_else(|| {
209                    EncodingError::ParseError(
210                        stringify!(OrderSide),
211                        format!("Invalid enum value, was {side_value}"),
212                    )
213                })?;
214                let price =
215                    decode_price_with_sentinel(price_values.value(i), price_precision, "price", i)?;
216                let size =
217                    decode_quantity_with_sentinel(size_values.value(i), size_precision, "size", i)?;
218                let order_id = order_id_values.value(i);
219                let flags = flags_values.value(i);
220                let sequence = sequence_values.value(i);
221                let ts_event = ts_event_values.value(i).into();
222                let ts_init = ts_init_values.value(i).into();
223
224                Ok(Self {
225                    instrument_id,
226                    action,
227                    order: BookOrder {
228                        side,
229                        price,
230                        size,
231                        order_id,
232                    },
233                    flags,
234                    sequence,
235                    ts_event,
236                    ts_init,
237                })
238            })
239            .collect();
240
241        result
242    }
243}
244
245impl DecodeDataFromRecordBatch for OrderBookDelta {
246    fn decode_data_batch(
247        metadata: &HashMap<String, String>,
248        record_batch: RecordBatch,
249    ) -> Result<Vec<Data>, EncodingError> {
250        let deltas: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
251        Ok(deltas.into_iter().map(Data::from).collect())
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use std::sync::Arc;
258
259    use arrow::{array::Array, record_batch::RecordBatch};
260    use nautilus_model::types::{
261        Price, Quantity,
262        fixed::FIXED_SCALAR,
263        price::{PRICE_UNDEF, PriceRaw},
264        quantity::{QUANTITY_UNDEF, QuantityRaw},
265    };
266    use pretty_assertions::assert_eq;
267    use rstest::rstest;
268
269    use super::*;
270    use crate::arrow::get_raw_price;
271
272    #[rstest]
273    fn test_get_schema() {
274        let instrument_id = InstrumentId::from("AAPL.XNAS");
275        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
276        let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
277
278        let expected_fields = vec![
279            Field::new("action", DataType::UInt8, false),
280            Field::new("side", DataType::UInt8, false),
281            Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
282            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
283            Field::new("order_id", DataType::UInt64, false),
284            Field::new("flags", DataType::UInt8, false),
285            Field::new("sequence", DataType::UInt64, false),
286            Field::new("ts_event", DataType::UInt64, false),
287            Field::new("ts_init", DataType::UInt64, false),
288        ];
289
290        let expected_schema = Schema::new_with_metadata(expected_fields, metadata);
291        assert_eq!(schema, expected_schema);
292    }
293
294    #[rstest]
295    fn test_get_schema_map() {
296        let schema_map = OrderBookDelta::get_schema_map();
297        let fixed_size_binary = format!("FixedSizeBinary({PRECISION_BYTES})");
298
299        assert_eq!(schema_map.get("action").unwrap(), "UInt8");
300        assert_eq!(schema_map.get("side").unwrap(), "UInt8");
301        assert_eq!(*schema_map.get("price").unwrap(), fixed_size_binary);
302        assert_eq!(*schema_map.get("size").unwrap(), fixed_size_binary);
303        assert_eq!(schema_map.get("order_id").unwrap(), "UInt64");
304        assert_eq!(schema_map.get("flags").unwrap(), "UInt8");
305        assert_eq!(schema_map.get("sequence").unwrap(), "UInt64");
306        assert_eq!(schema_map.get("ts_event").unwrap(), "UInt64");
307        assert_eq!(schema_map.get("ts_init").unwrap(), "UInt64");
308    }
309
310    #[rstest]
311    fn test_encode_batch() {
312        let instrument_id = InstrumentId::from("AAPL.XNAS");
313        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
314
315        let delta1 = OrderBookDelta {
316            instrument_id,
317            action: BookAction::Add,
318            order: BookOrder {
319                side: OrderSide::Buy,
320                price: Price::from("100.10"),
321                size: Quantity::from(100),
322                order_id: 1,
323            },
324            flags: 0,
325            sequence: 1,
326            ts_event: 1.into(),
327            ts_init: 3.into(),
328        };
329
330        let delta2 = OrderBookDelta {
331            instrument_id,
332            action: BookAction::Update,
333            order: BookOrder {
334                side: OrderSide::Sell,
335                price: Price::from("101.20"),
336                size: Quantity::from(200),
337                order_id: 2,
338            },
339            flags: 1,
340            sequence: 2,
341            ts_event: 2.into(),
342            ts_init: 4.into(),
343        };
344
345        let data = vec![delta1, delta2];
346        let record_batch = OrderBookDelta::encode_batch(&metadata, &data).unwrap();
347
348        let columns = record_batch.columns();
349        let action_values = columns[0].as_any().downcast_ref::<UInt8Array>().unwrap();
350        let side_values = columns[1].as_any().downcast_ref::<UInt8Array>().unwrap();
351        let price_values = columns[2]
352            .as_any()
353            .downcast_ref::<FixedSizeBinaryArray>()
354            .unwrap();
355        let size_values = columns[3]
356            .as_any()
357            .downcast_ref::<FixedSizeBinaryArray>()
358            .unwrap();
359        let order_id_values = columns[4].as_any().downcast_ref::<UInt64Array>().unwrap();
360        let flags_values = columns[5].as_any().downcast_ref::<UInt8Array>().unwrap();
361        let sequence_values = columns[6].as_any().downcast_ref::<UInt64Array>().unwrap();
362        let ts_event_values = columns[7].as_any().downcast_ref::<UInt64Array>().unwrap();
363        let ts_init_values = columns[8].as_any().downcast_ref::<UInt64Array>().unwrap();
364
365        assert_eq!(columns.len(), 9);
366        assert_eq!(action_values.len(), 2);
367        assert_eq!(action_values.value(0), 1);
368        assert_eq!(action_values.value(1), 2);
369        assert_eq!(side_values.len(), 2);
370        assert_eq!(side_values.value(0), 1);
371        assert_eq!(side_values.value(1), 2);
372
373        assert_eq!(price_values.len(), 2);
374        assert_eq!(
375            get_raw_price(price_values.value(0)),
376            (100.10 * FIXED_SCALAR) as PriceRaw
377        );
378        assert_eq!(
379            get_raw_price(price_values.value(1)),
380            (101.20 * FIXED_SCALAR) as PriceRaw
381        );
382
383        assert_eq!(size_values.len(), 2);
384        assert_eq!(
385            get_raw_price(size_values.value(0)),
386            (100.0 * FIXED_SCALAR) as PriceRaw
387        );
388        assert_eq!(
389            get_raw_price(size_values.value(1)),
390            (200.0 * FIXED_SCALAR) as PriceRaw
391        );
392        assert_eq!(order_id_values.len(), 2);
393        assert_eq!(order_id_values.value(0), 1);
394        assert_eq!(order_id_values.value(1), 2);
395        assert_eq!(flags_values.len(), 2);
396        assert_eq!(flags_values.value(0), 0);
397        assert_eq!(flags_values.value(1), 1);
398        assert_eq!(sequence_values.len(), 2);
399        assert_eq!(sequence_values.value(0), 1);
400        assert_eq!(sequence_values.value(1), 2);
401        assert_eq!(ts_event_values.len(), 2);
402        assert_eq!(ts_event_values.value(0), 1);
403        assert_eq!(ts_event_values.value(1), 2);
404        assert_eq!(ts_init_values.len(), 2);
405        assert_eq!(ts_init_values.value(0), 3);
406        assert_eq!(ts_init_values.value(1), 4);
407    }
408
409    #[rstest]
410    fn test_decode_batch() {
411        let instrument_id = InstrumentId::from("AAPL.XNAS");
412        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
413
414        let action = UInt8Array::from(vec![1, 2]);
415        let side = UInt8Array::from(vec![1, 1]);
416        let price = FixedSizeBinaryArray::from(vec![
417            &((101.10 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
418            &((101.20 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
419        ]);
420        let size = FixedSizeBinaryArray::from(vec![
421            &((10000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
422            &((9000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
423        ]);
424        let order_id = UInt64Array::from(vec![1, 2]);
425        let flags = UInt8Array::from(vec![0, 0]);
426        let sequence = UInt64Array::from(vec![1, 2]);
427        let ts_event = UInt64Array::from(vec![1, 2]);
428        let ts_init = UInt64Array::from(vec![3, 4]);
429
430        let record_batch = RecordBatch::try_new(
431            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
432            vec![
433                Arc::new(action),
434                Arc::new(side),
435                Arc::new(price),
436                Arc::new(size),
437                Arc::new(order_id),
438                Arc::new(flags),
439                Arc::new(sequence),
440                Arc::new(ts_event),
441                Arc::new(ts_init),
442            ],
443        )
444        .unwrap();
445
446        let decoded_data = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
447        assert_eq!(decoded_data.len(), 2);
448    }
449
450    #[rstest]
451    fn test_decode_batch_with_undef_values() {
452        let instrument_id = InstrumentId::from("PLTR.XNAS");
453        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
454
455        // Create test data with 'R' (clear) action which has PRICE_UNDEF and QUANTITY_UNDEF
456        let action = UInt8Array::from(vec![4, 1]); // 4 = Clear, 1 = Add
457        let side = UInt8Array::from(vec![0, 1]); // NoOrderSide for Clear, Buy for Add
458        let price = FixedSizeBinaryArray::from(vec![
459            &PRICE_UNDEF.to_le_bytes(),
460            &((100.50 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
461        ]);
462        let size = FixedSizeBinaryArray::from(vec![
463            &QUANTITY_UNDEF.to_le_bytes(),
464            &((1000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
465        ]);
466        let order_id = UInt64Array::from(vec![0, 1]);
467        let flags = UInt8Array::from(vec![0, 0]);
468        let sequence = UInt64Array::from(vec![1, 2]);
469        let ts_event = UInt64Array::from(vec![1, 2]);
470        let ts_init = UInt64Array::from(vec![3, 4]);
471
472        let record_batch = RecordBatch::try_new(
473            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
474            vec![
475                Arc::new(action),
476                Arc::new(side),
477                Arc::new(price),
478                Arc::new(size),
479                Arc::new(order_id),
480                Arc::new(flags),
481                Arc::new(sequence),
482                Arc::new(ts_event),
483                Arc::new(ts_init),
484            ],
485        )
486        .unwrap();
487
488        let decoded_data = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
489        assert_eq!(decoded_data.len(), 2);
490        assert_eq!(decoded_data[0].order.price.raw, PRICE_UNDEF);
491        assert_eq!(decoded_data[0].order.price.precision, 0);
492        assert_eq!(decoded_data[0].order.size.raw, QUANTITY_UNDEF);
493        assert_eq!(decoded_data[0].order.size.precision, 0);
494        assert_eq!(decoded_data[1].order.price.precision, 2);
495        assert_eq!(decoded_data[1].order.size.precision, 0);
496    }
497
498    #[rstest]
499    fn test_decode_batch_invalid_price_returns_error() {
500        let instrument_id = InstrumentId::from("AAPL.XNAS");
501        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
502
503        let action = UInt8Array::from(vec![1]);
504        let side = UInt8Array::from(vec![1]);
505
506        let invalid_price: PriceRaw = PriceRaw::MAX - 1000;
507        let price = FixedSizeBinaryArray::from(vec![&invalid_price.to_le_bytes()]);
508        let size = FixedSizeBinaryArray::from(vec![
509            &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
510        ]);
511        let order_id = UInt64Array::from(vec![1]);
512        let flags = UInt8Array::from(vec![0]);
513        let sequence = UInt64Array::from(vec![1]);
514        let ts_event = UInt64Array::from(vec![1]);
515        let ts_init = UInt64Array::from(vec![2]);
516
517        let record_batch = RecordBatch::try_new(
518            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
519            vec![
520                Arc::new(action),
521                Arc::new(side),
522                Arc::new(price),
523                Arc::new(size),
524                Arc::new(order_id),
525                Arc::new(flags),
526                Arc::new(sequence),
527                Arc::new(ts_event),
528                Arc::new(ts_init),
529            ],
530        )
531        .unwrap();
532
533        let result = OrderBookDelta::decode_batch(&metadata, record_batch);
534        assert!(result.is_err());
535        let err = result.unwrap_err();
536        assert!(
537            err.to_string().contains("price") && err.to_string().contains("row 0"),
538            "Expected price error at row 0, was: {err}"
539        );
540    }
541
542    #[rstest]
543    fn test_decode_batch_invalid_action_returns_error() {
544        let instrument_id = InstrumentId::from("AAPL.XNAS");
545        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
546
547        let action = UInt8Array::from(vec![99]);
548        let side = UInt8Array::from(vec![1]);
549        let price =
550            FixedSizeBinaryArray::from(vec![&((100.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes()]);
551        let size = FixedSizeBinaryArray::from(vec![
552            &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
553        ]);
554        let order_id = UInt64Array::from(vec![1]);
555        let flags = UInt8Array::from(vec![0]);
556        let sequence = UInt64Array::from(vec![1]);
557        let ts_event = UInt64Array::from(vec![1]);
558        let ts_init = UInt64Array::from(vec![2]);
559
560        let record_batch = RecordBatch::try_new(
561            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
562            vec![
563                Arc::new(action),
564                Arc::new(side),
565                Arc::new(price),
566                Arc::new(size),
567                Arc::new(order_id),
568                Arc::new(flags),
569                Arc::new(sequence),
570                Arc::new(ts_event),
571                Arc::new(ts_init),
572            ],
573        )
574        .unwrap();
575
576        let result = OrderBookDelta::decode_batch(&metadata, record_batch);
577        assert!(result.is_err());
578        let err = result.unwrap_err();
579        assert!(
580            err.to_string().contains("BookAction"),
581            "Expected BookAction error, was: {err}"
582        );
583    }
584
585    #[rstest]
586    fn test_decode_batch_missing_instrument_id_returns_error() {
587        let instrument_id = InstrumentId::from("AAPL.XNAS");
588        let mut metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
589        metadata.remove(KEY_INSTRUMENT_ID);
590
591        let action = UInt8Array::from(vec![1]);
592        let side = UInt8Array::from(vec![1]);
593        let price =
594            FixedSizeBinaryArray::from(vec![&((100.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes()]);
595        let size = FixedSizeBinaryArray::from(vec![
596            &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
597        ]);
598        let order_id = UInt64Array::from(vec![1]);
599        let flags = UInt8Array::from(vec![0]);
600        let sequence = UInt64Array::from(vec![1]);
601        let ts_event = UInt64Array::from(vec![1]);
602        let ts_init = UInt64Array::from(vec![2]);
603
604        let record_batch = RecordBatch::try_new(
605            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
606            vec![
607                Arc::new(action),
608                Arc::new(side),
609                Arc::new(price),
610                Arc::new(size),
611                Arc::new(order_id),
612                Arc::new(flags),
613                Arc::new(sequence),
614                Arc::new(ts_event),
615                Arc::new(ts_init),
616            ],
617        )
618        .unwrap();
619
620        let result = OrderBookDelta::decode_batch(&metadata, record_batch);
621        assert!(result.is_err());
622        let err = result.unwrap_err();
623        assert!(
624            err.to_string().contains("instrument_id"),
625            "Expected missing instrument_id error, was: {err}"
626        );
627    }
628
629    #[rstest]
630    fn test_encode_decode_round_trip() {
631        let instrument_id = InstrumentId::from("AAPL.XNAS");
632        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
633
634        let delta1 = OrderBookDelta {
635            instrument_id,
636            action: BookAction::Add,
637            order: BookOrder {
638                side: OrderSide::Buy,
639                price: Price::from("100.10"),
640                size: Quantity::from(100),
641                order_id: 1,
642            },
643            flags: 0,
644            sequence: 1,
645            ts_event: 1_000_000_000.into(),
646            ts_init: 1_000_000_001.into(),
647        };
648
649        let delta2 = OrderBookDelta {
650            instrument_id,
651            action: BookAction::Update,
652            order: BookOrder {
653                side: OrderSide::Sell,
654                price: Price::from("101.20"),
655                size: Quantity::from(200),
656                order_id: 2,
657            },
658            flags: 1,
659            sequence: 2,
660            ts_event: 2_000_000_000.into(),
661            ts_init: 2_000_000_001.into(),
662        };
663
664        let original = vec![delta1, delta2];
665        let record_batch = OrderBookDelta::encode_batch(&metadata, &original).unwrap();
666        let decoded = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
667
668        assert_eq!(decoded.len(), original.len());
669        for (orig, dec) in original.iter().zip(decoded.iter()) {
670            assert_eq!(dec.instrument_id, orig.instrument_id);
671            assert_eq!(dec.action, orig.action);
672            assert_eq!(dec.order.side, orig.order.side);
673            assert_eq!(dec.order.price, orig.order.price);
674            assert_eq!(dec.order.size, orig.order.size);
675            assert_eq!(dec.order.order_id, orig.order.order_id);
676            assert_eq!(dec.flags, orig.flags);
677            assert_eq!(dec.sequence, orig.sequence);
678            assert_eq!(dec.ts_event, orig.ts_event);
679            assert_eq!(dec.ts_init, orig.ts_init);
680        }
681    }
682}