nautilus_serialization/arrow/
delta.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19    array::{FixedSizeBinaryArray, FixedSizeBinaryBuilder, UInt8Array, UInt64Array},
20    datatypes::{DataType, Field, Schema},
21    error::ArrowError,
22    record_batch::RecordBatch,
23};
24use nautilus_model::{
25    data::{BookOrder, OrderBookDelta},
26    enums::{BookAction, FromU8, OrderSide},
27    identifiers::InstrumentId,
28    types::fixed::PRECISION_BYTES,
29};
30
31use super::{
32    DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
33    KEY_SIZE_PRECISION, decode_price_with_sentinel, decode_quantity_with_sentinel, extract_column,
34};
35use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
36
37impl ArrowSchemaProvider for OrderBookDelta {
38    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
39        let fields = vec![
40            Field::new("action", DataType::UInt8, false),
41            Field::new("side", DataType::UInt8, false),
42            Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
43            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
44            Field::new("order_id", DataType::UInt64, false),
45            Field::new("flags", DataType::UInt8, false),
46            Field::new("sequence", DataType::UInt64, false),
47            Field::new("ts_event", DataType::UInt64, false),
48            Field::new("ts_init", DataType::UInt64, false),
49        ];
50
51        match metadata {
52            Some(metadata) => Schema::new_with_metadata(fields, metadata),
53            None => Schema::new(fields),
54        }
55    }
56}
57
58fn parse_metadata(
59    metadata: &HashMap<String, String>,
60) -> Result<(InstrumentId, u8, u8), EncodingError> {
61    let instrument_id_str = metadata
62        .get(KEY_INSTRUMENT_ID)
63        .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
64    let instrument_id = InstrumentId::from_str(instrument_id_str)
65        .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
66
67    let price_precision = metadata
68        .get(KEY_PRICE_PRECISION)
69        .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
70        .parse::<u8>()
71        .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
72
73    let size_precision = metadata
74        .get(KEY_SIZE_PRECISION)
75        .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
76        .parse::<u8>()
77        .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
78
79    Ok((instrument_id, price_precision, size_precision))
80}
81
82impl EncodeToRecordBatch for OrderBookDelta {
83    fn encode_batch(
84        metadata: &HashMap<String, String>,
85        data: &[Self],
86    ) -> Result<RecordBatch, ArrowError> {
87        let mut action_builder = UInt8Array::builder(data.len());
88        let mut side_builder = UInt8Array::builder(data.len());
89        let mut price_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
90        let mut size_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
91        let mut order_id_builder = UInt64Array::builder(data.len());
92        let mut flags_builder = UInt8Array::builder(data.len());
93        let mut sequence_builder = UInt64Array::builder(data.len());
94        let mut ts_event_builder = UInt64Array::builder(data.len());
95        let mut ts_init_builder = UInt64Array::builder(data.len());
96
97        for delta in data {
98            action_builder.append_value(delta.action as u8);
99            side_builder.append_value(delta.order.side as u8);
100            price_builder
101                .append_value(delta.order.price.raw.to_le_bytes())
102                .unwrap();
103            size_builder
104                .append_value(delta.order.size.raw.to_le_bytes())
105                .unwrap();
106            order_id_builder.append_value(delta.order.order_id);
107            flags_builder.append_value(delta.flags);
108            sequence_builder.append_value(delta.sequence);
109            ts_event_builder.append_value(delta.ts_event.as_u64());
110            ts_init_builder.append_value(delta.ts_init.as_u64());
111        }
112
113        let action_array = action_builder.finish();
114        let side_array = side_builder.finish();
115        let price_array = price_builder.finish();
116        let size_array = size_builder.finish();
117        let order_id_array = order_id_builder.finish();
118        let flags_array = flags_builder.finish();
119        let sequence_array = sequence_builder.finish();
120        let ts_event_array = ts_event_builder.finish();
121        let ts_init_array = ts_init_builder.finish();
122
123        RecordBatch::try_new(
124            Self::get_schema(Some(metadata.clone())).into(),
125            vec![
126                Arc::new(action_array),
127                Arc::new(side_array),
128                Arc::new(price_array),
129                Arc::new(size_array),
130                Arc::new(order_id_array),
131                Arc::new(flags_array),
132                Arc::new(sequence_array),
133                Arc::new(ts_event_array),
134                Arc::new(ts_init_array),
135            ],
136        )
137    }
138
139    fn metadata(&self) -> HashMap<String, String> {
140        Self::get_metadata(
141            &self.instrument_id,
142            self.order.price.precision,
143            self.order.size.precision,
144        )
145    }
146
147    /// Extract metadata from first two deltas
148    ///
149    /// Use the second delta if the first one has 0 precision
150    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
151        let delta = chunk
152            .first()
153            .expect("Chunk should have at least one element to encode");
154
155        if delta.order.price.precision == 0
156            && delta.order.size.precision == 0
157            && let Some(delta) = chunk.get(1)
158        {
159            return EncodeToRecordBatch::metadata(delta);
160        }
161
162        EncodeToRecordBatch::metadata(delta)
163    }
164}
165
166impl DecodeFromRecordBatch for OrderBookDelta {
167    fn decode_batch(
168        metadata: &HashMap<String, String>,
169        record_batch: RecordBatch,
170    ) -> Result<Vec<Self>, EncodingError> {
171        let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
172        let cols = record_batch.columns();
173
174        let action_values = extract_column::<UInt8Array>(cols, "action", 0, DataType::UInt8)?;
175        let side_values = extract_column::<UInt8Array>(cols, "side", 1, DataType::UInt8)?;
176        let price_values = extract_column::<FixedSizeBinaryArray>(
177            cols,
178            "price",
179            2,
180            DataType::FixedSizeBinary(PRECISION_BYTES),
181        )?;
182        let size_values = extract_column::<FixedSizeBinaryArray>(
183            cols,
184            "size",
185            3,
186            DataType::FixedSizeBinary(PRECISION_BYTES),
187        )?;
188        let order_id_values = extract_column::<UInt64Array>(cols, "order_id", 4, DataType::UInt64)?;
189        let flags_values = extract_column::<UInt8Array>(cols, "flags", 5, DataType::UInt8)?;
190        let sequence_values = extract_column::<UInt64Array>(cols, "sequence", 6, DataType::UInt64)?;
191        let ts_event_values = extract_column::<UInt64Array>(cols, "ts_event", 7, DataType::UInt64)?;
192        let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 8, DataType::UInt64)?;
193
194        if price_values.value_length() != PRECISION_BYTES {
195            return Err(EncodingError::ParseError(
196                "price",
197                format!(
198                    "Invalid value length: expected {PRECISION_BYTES}, found {}",
199                    price_values.value_length()
200                ),
201            ));
202        }
203
204        let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
205            .map(|i| {
206                let action_value = action_values.value(i);
207                let action = BookAction::from_u8(action_value).ok_or_else(|| {
208                    EncodingError::ParseError(
209                        stringify!(BookAction),
210                        format!("Invalid enum value, was {action_value}"),
211                    )
212                })?;
213                let side_value = side_values.value(i);
214                let side = OrderSide::from_u8(side_value).ok_or_else(|| {
215                    EncodingError::ParseError(
216                        stringify!(OrderSide),
217                        format!("Invalid enum value, was {side_value}"),
218                    )
219                })?;
220                let price =
221                    decode_price_with_sentinel(price_values.value(i), price_precision, "price", i)?;
222                let size =
223                    decode_quantity_with_sentinel(size_values.value(i), size_precision, "size", i)?;
224                let order_id = order_id_values.value(i);
225                let flags = flags_values.value(i);
226                let sequence = sequence_values.value(i);
227                let ts_event = ts_event_values.value(i).into();
228                let ts_init = ts_init_values.value(i).into();
229
230                Ok(Self {
231                    instrument_id,
232                    action,
233                    order: BookOrder {
234                        side,
235                        price,
236                        size,
237                        order_id,
238                    },
239                    flags,
240                    sequence,
241                    ts_event,
242                    ts_init,
243                })
244            })
245            .collect();
246
247        result
248    }
249}
250
251impl DecodeDataFromRecordBatch for OrderBookDelta {
252    fn decode_data_batch(
253        metadata: &HashMap<String, String>,
254        record_batch: RecordBatch,
255    ) -> Result<Vec<Data>, EncodingError> {
256        let deltas: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
257        Ok(deltas.into_iter().map(Data::from).collect())
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use std::sync::Arc;
264
265    use arrow::{array::Array, record_batch::RecordBatch};
266    use nautilus_model::types::{
267        Price, Quantity,
268        fixed::FIXED_SCALAR,
269        price::{PRICE_UNDEF, PriceRaw},
270        quantity::{QUANTITY_UNDEF, QuantityRaw},
271    };
272    use pretty_assertions::assert_eq;
273    use rstest::rstest;
274
275    use super::*;
276    use crate::arrow::get_raw_price;
277
278    #[rstest]
279    fn test_get_schema() {
280        let instrument_id = InstrumentId::from("AAPL.XNAS");
281        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
282        let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
283
284        let expected_fields = vec![
285            Field::new("action", DataType::UInt8, false),
286            Field::new("side", DataType::UInt8, false),
287            Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
288            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
289            Field::new("order_id", DataType::UInt64, false),
290            Field::new("flags", DataType::UInt8, false),
291            Field::new("sequence", DataType::UInt64, false),
292            Field::new("ts_event", DataType::UInt64, false),
293            Field::new("ts_init", DataType::UInt64, false),
294        ];
295
296        let expected_schema = Schema::new_with_metadata(expected_fields, metadata);
297        assert_eq!(schema, expected_schema);
298    }
299
300    #[rstest]
301    fn test_get_schema_map() {
302        let schema_map = OrderBookDelta::get_schema_map();
303        let fixed_size_binary = format!("FixedSizeBinary({PRECISION_BYTES})");
304
305        assert_eq!(schema_map.get("action").unwrap(), "UInt8");
306        assert_eq!(schema_map.get("side").unwrap(), "UInt8");
307        assert_eq!(*schema_map.get("price").unwrap(), fixed_size_binary);
308        assert_eq!(*schema_map.get("size").unwrap(), fixed_size_binary);
309        assert_eq!(schema_map.get("order_id").unwrap(), "UInt64");
310        assert_eq!(schema_map.get("flags").unwrap(), "UInt8");
311        assert_eq!(schema_map.get("sequence").unwrap(), "UInt64");
312        assert_eq!(schema_map.get("ts_event").unwrap(), "UInt64");
313        assert_eq!(schema_map.get("ts_init").unwrap(), "UInt64");
314    }
315
316    #[rstest]
317    fn test_encode_batch() {
318        let instrument_id = InstrumentId::from("AAPL.XNAS");
319        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
320
321        let delta1 = OrderBookDelta {
322            instrument_id,
323            action: BookAction::Add,
324            order: BookOrder {
325                side: OrderSide::Buy,
326                price: Price::from("100.10"),
327                size: Quantity::from(100),
328                order_id: 1,
329            },
330            flags: 0,
331            sequence: 1,
332            ts_event: 1.into(),
333            ts_init: 3.into(),
334        };
335
336        let delta2 = OrderBookDelta {
337            instrument_id,
338            action: BookAction::Update,
339            order: BookOrder {
340                side: OrderSide::Sell,
341                price: Price::from("101.20"),
342                size: Quantity::from(200),
343                order_id: 2,
344            },
345            flags: 1,
346            sequence: 2,
347            ts_event: 2.into(),
348            ts_init: 4.into(),
349        };
350
351        let data = vec![delta1, delta2];
352        let record_batch = OrderBookDelta::encode_batch(&metadata, &data).unwrap();
353
354        let columns = record_batch.columns();
355        let action_values = columns[0].as_any().downcast_ref::<UInt8Array>().unwrap();
356        let side_values = columns[1].as_any().downcast_ref::<UInt8Array>().unwrap();
357        let price_values = columns[2]
358            .as_any()
359            .downcast_ref::<FixedSizeBinaryArray>()
360            .unwrap();
361        let size_values = columns[3]
362            .as_any()
363            .downcast_ref::<FixedSizeBinaryArray>()
364            .unwrap();
365        let order_id_values = columns[4].as_any().downcast_ref::<UInt64Array>().unwrap();
366        let flags_values = columns[5].as_any().downcast_ref::<UInt8Array>().unwrap();
367        let sequence_values = columns[6].as_any().downcast_ref::<UInt64Array>().unwrap();
368        let ts_event_values = columns[7].as_any().downcast_ref::<UInt64Array>().unwrap();
369        let ts_init_values = columns[8].as_any().downcast_ref::<UInt64Array>().unwrap();
370
371        assert_eq!(columns.len(), 9);
372        assert_eq!(action_values.len(), 2);
373        assert_eq!(action_values.value(0), 1);
374        assert_eq!(action_values.value(1), 2);
375        assert_eq!(side_values.len(), 2);
376        assert_eq!(side_values.value(0), 1);
377        assert_eq!(side_values.value(1), 2);
378
379        assert_eq!(price_values.len(), 2);
380        assert_eq!(
381            get_raw_price(price_values.value(0)),
382            (100.10 * FIXED_SCALAR) as PriceRaw
383        );
384        assert_eq!(
385            get_raw_price(price_values.value(1)),
386            (101.20 * FIXED_SCALAR) as PriceRaw
387        );
388
389        assert_eq!(size_values.len(), 2);
390        assert_eq!(
391            get_raw_price(size_values.value(0)),
392            (100.0 * FIXED_SCALAR) as PriceRaw
393        );
394        assert_eq!(
395            get_raw_price(size_values.value(1)),
396            (200.0 * FIXED_SCALAR) as PriceRaw
397        );
398        assert_eq!(order_id_values.len(), 2);
399        assert_eq!(order_id_values.value(0), 1);
400        assert_eq!(order_id_values.value(1), 2);
401        assert_eq!(flags_values.len(), 2);
402        assert_eq!(flags_values.value(0), 0);
403        assert_eq!(flags_values.value(1), 1);
404        assert_eq!(sequence_values.len(), 2);
405        assert_eq!(sequence_values.value(0), 1);
406        assert_eq!(sequence_values.value(1), 2);
407        assert_eq!(ts_event_values.len(), 2);
408        assert_eq!(ts_event_values.value(0), 1);
409        assert_eq!(ts_event_values.value(1), 2);
410        assert_eq!(ts_init_values.len(), 2);
411        assert_eq!(ts_init_values.value(0), 3);
412        assert_eq!(ts_init_values.value(1), 4);
413    }
414
415    #[rstest]
416    fn test_decode_batch() {
417        let instrument_id = InstrumentId::from("AAPL.XNAS");
418        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
419
420        let action = UInt8Array::from(vec![1, 2]);
421        let side = UInt8Array::from(vec![1, 1]);
422        let price = FixedSizeBinaryArray::from(vec![
423            &((101.10 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
424            &((101.20 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
425        ]);
426        let size = FixedSizeBinaryArray::from(vec![
427            &((10000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
428            &((9000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
429        ]);
430        let order_id = UInt64Array::from(vec![1, 2]);
431        let flags = UInt8Array::from(vec![0, 0]);
432        let sequence = UInt64Array::from(vec![1, 2]);
433        let ts_event = UInt64Array::from(vec![1, 2]);
434        let ts_init = UInt64Array::from(vec![3, 4]);
435
436        let record_batch = RecordBatch::try_new(
437            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
438            vec![
439                Arc::new(action),
440                Arc::new(side),
441                Arc::new(price),
442                Arc::new(size),
443                Arc::new(order_id),
444                Arc::new(flags),
445                Arc::new(sequence),
446                Arc::new(ts_event),
447                Arc::new(ts_init),
448            ],
449        )
450        .unwrap();
451
452        let decoded_data = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
453        assert_eq!(decoded_data.len(), 2);
454    }
455
456    #[rstest]
457    fn test_decode_batch_with_undef_values() {
458        let instrument_id = InstrumentId::from("PLTR.XNAS");
459        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
460
461        // Create test data with 'R' (clear) action which has PRICE_UNDEF and QUANTITY_UNDEF
462        let action = UInt8Array::from(vec![4, 1]); // 4 = Clear, 1 = Add
463        let side = UInt8Array::from(vec![0, 1]); // NoOrderSide for Clear, Buy for Add
464        let price = FixedSizeBinaryArray::from(vec![
465            &PRICE_UNDEF.to_le_bytes(),
466            &((100.50 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
467        ]);
468        let size = FixedSizeBinaryArray::from(vec![
469            &QUANTITY_UNDEF.to_le_bytes(),
470            &((1000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
471        ]);
472        let order_id = UInt64Array::from(vec![0, 1]);
473        let flags = UInt8Array::from(vec![0, 0]);
474        let sequence = UInt64Array::from(vec![1, 2]);
475        let ts_event = UInt64Array::from(vec![1, 2]);
476        let ts_init = UInt64Array::from(vec![3, 4]);
477
478        let record_batch = RecordBatch::try_new(
479            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
480            vec![
481                Arc::new(action),
482                Arc::new(side),
483                Arc::new(price),
484                Arc::new(size),
485                Arc::new(order_id),
486                Arc::new(flags),
487                Arc::new(sequence),
488                Arc::new(ts_event),
489                Arc::new(ts_init),
490            ],
491        )
492        .unwrap();
493
494        let decoded_data = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
495        assert_eq!(decoded_data.len(), 2);
496        assert_eq!(decoded_data[0].order.price.raw, PRICE_UNDEF);
497        assert_eq!(decoded_data[0].order.price.precision, 0);
498        assert_eq!(decoded_data[0].order.size.raw, QUANTITY_UNDEF);
499        assert_eq!(decoded_data[0].order.size.precision, 0);
500        assert_eq!(decoded_data[1].order.price.precision, 2);
501        assert_eq!(decoded_data[1].order.size.precision, 0);
502    }
503
504    #[rstest]
505    fn test_decode_batch_invalid_price_returns_error() {
506        let instrument_id = InstrumentId::from("AAPL.XNAS");
507        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
508
509        let action = UInt8Array::from(vec![1]);
510        let side = UInt8Array::from(vec![1]);
511
512        let invalid_price: PriceRaw = PriceRaw::MAX - 1000;
513        let price = FixedSizeBinaryArray::from(vec![&invalid_price.to_le_bytes()]);
514        let size = FixedSizeBinaryArray::from(vec![
515            &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
516        ]);
517        let order_id = UInt64Array::from(vec![1]);
518        let flags = UInt8Array::from(vec![0]);
519        let sequence = UInt64Array::from(vec![1]);
520        let ts_event = UInt64Array::from(vec![1]);
521        let ts_init = UInt64Array::from(vec![2]);
522
523        let record_batch = RecordBatch::try_new(
524            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
525            vec![
526                Arc::new(action),
527                Arc::new(side),
528                Arc::new(price),
529                Arc::new(size),
530                Arc::new(order_id),
531                Arc::new(flags),
532                Arc::new(sequence),
533                Arc::new(ts_event),
534                Arc::new(ts_init),
535            ],
536        )
537        .unwrap();
538
539        let result = OrderBookDelta::decode_batch(&metadata, record_batch);
540        assert!(result.is_err());
541        let err = result.unwrap_err();
542        assert!(
543            err.to_string().contains("price") && err.to_string().contains("row 0"),
544            "Expected price error at row 0, got: {err}"
545        );
546    }
547
548    #[rstest]
549    fn test_decode_batch_invalid_action_returns_error() {
550        let instrument_id = InstrumentId::from("AAPL.XNAS");
551        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
552
553        let action = UInt8Array::from(vec![99]);
554        let side = UInt8Array::from(vec![1]);
555        let price =
556            FixedSizeBinaryArray::from(vec![&((100.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes()]);
557        let size = FixedSizeBinaryArray::from(vec![
558            &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
559        ]);
560        let order_id = UInt64Array::from(vec![1]);
561        let flags = UInt8Array::from(vec![0]);
562        let sequence = UInt64Array::from(vec![1]);
563        let ts_event = UInt64Array::from(vec![1]);
564        let ts_init = UInt64Array::from(vec![2]);
565
566        let record_batch = RecordBatch::try_new(
567            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
568            vec![
569                Arc::new(action),
570                Arc::new(side),
571                Arc::new(price),
572                Arc::new(size),
573                Arc::new(order_id),
574                Arc::new(flags),
575                Arc::new(sequence),
576                Arc::new(ts_event),
577                Arc::new(ts_init),
578            ],
579        )
580        .unwrap();
581
582        let result = OrderBookDelta::decode_batch(&metadata, record_batch);
583        assert!(result.is_err());
584        let err = result.unwrap_err();
585        assert!(
586            err.to_string().contains("BookAction"),
587            "Expected BookAction error, got: {err}"
588        );
589    }
590
591    #[rstest]
592    fn test_decode_batch_missing_instrument_id_returns_error() {
593        let instrument_id = InstrumentId::from("AAPL.XNAS");
594        let mut metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
595        metadata.remove(KEY_INSTRUMENT_ID);
596
597        let action = UInt8Array::from(vec![1]);
598        let side = UInt8Array::from(vec![1]);
599        let price =
600            FixedSizeBinaryArray::from(vec![&((100.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes()]);
601        let size = FixedSizeBinaryArray::from(vec![
602            &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
603        ]);
604        let order_id = UInt64Array::from(vec![1]);
605        let flags = UInt8Array::from(vec![0]);
606        let sequence = UInt64Array::from(vec![1]);
607        let ts_event = UInt64Array::from(vec![1]);
608        let ts_init = UInt64Array::from(vec![2]);
609
610        let record_batch = RecordBatch::try_new(
611            OrderBookDelta::get_schema(Some(metadata.clone())).into(),
612            vec![
613                Arc::new(action),
614                Arc::new(side),
615                Arc::new(price),
616                Arc::new(size),
617                Arc::new(order_id),
618                Arc::new(flags),
619                Arc::new(sequence),
620                Arc::new(ts_event),
621                Arc::new(ts_init),
622            ],
623        )
624        .unwrap();
625
626        let result = OrderBookDelta::decode_batch(&metadata, record_batch);
627        assert!(result.is_err());
628        let err = result.unwrap_err();
629        assert!(
630            err.to_string().contains("instrument_id"),
631            "Expected missing instrument_id error, got: {err}"
632        );
633    }
634
635    #[rstest]
636    fn test_encode_decode_round_trip() {
637        let instrument_id = InstrumentId::from("AAPL.XNAS");
638        let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
639
640        let delta1 = OrderBookDelta {
641            instrument_id,
642            action: BookAction::Add,
643            order: BookOrder {
644                side: OrderSide::Buy,
645                price: Price::from("100.10"),
646                size: Quantity::from(100),
647                order_id: 1,
648            },
649            flags: 0,
650            sequence: 1,
651            ts_event: 1_000_000_000.into(),
652            ts_init: 1_000_000_001.into(),
653        };
654
655        let delta2 = OrderBookDelta {
656            instrument_id,
657            action: BookAction::Update,
658            order: BookOrder {
659                side: OrderSide::Sell,
660                price: Price::from("101.20"),
661                size: Quantity::from(200),
662                order_id: 2,
663            },
664            flags: 1,
665            sequence: 2,
666            ts_event: 2_000_000_000.into(),
667            ts_init: 2_000_000_001.into(),
668        };
669
670        let original = vec![delta1, delta2];
671        let record_batch = OrderBookDelta::encode_batch(&metadata, &original).unwrap();
672        let decoded = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
673
674        assert_eq!(decoded.len(), original.len());
675        for (orig, dec) in original.iter().zip(decoded.iter()) {
676            assert_eq!(dec.instrument_id, orig.instrument_id);
677            assert_eq!(dec.action, orig.action);
678            assert_eq!(dec.order.side, orig.order.side);
679            assert_eq!(dec.order.price, orig.order.price);
680            assert_eq!(dec.order.size, orig.order.size);
681            assert_eq!(dec.order.order_id, orig.order.order_id);
682            assert_eq!(dec.flags, orig.flags);
683            assert_eq!(dec.sequence, orig.sequence);
684            assert_eq!(dec.ts_event, orig.ts_event);
685            assert_eq!(dec.ts_init, orig.ts_init);
686        }
687    }
688}