Skip to main content

nautilus_serialization/arrow/
depth.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19    array::{
20        Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder, UInt8Array, UInt32Array, UInt64Array,
21    },
22    datatypes::{DataType, Field, Schema},
23    error::ArrowError,
24    record_batch::RecordBatch,
25};
26use nautilus_model::{
27    data::{
28        depth::{DEPTH10_LEN, OrderBookDepth10},
29        order::BookOrder,
30    },
31    enums::OrderSide,
32    identifiers::InstrumentId,
33    types::fixed::PRECISION_BYTES,
34};
35
36use super::{
37    DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
38    KEY_SIZE_PRECISION, decode_price, decode_quantity, extract_column, validate_precision_bytes,
39};
40use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
41
42fn get_field_data() -> Vec<(&'static str, DataType)> {
43    vec![
44        ("bid_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
45        ("ask_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
46        ("bid_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
47        ("ask_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
48        ("bid_count", DataType::UInt32),
49        ("ask_count", DataType::UInt32),
50    ]
51}
52
53impl ArrowSchemaProvider for OrderBookDepth10 {
54    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
55        let mut fields = Vec::new();
56        let field_data = get_field_data();
57
58        // Schema is of the form:
59        // bid_price_0, bid_price_1, ..., bid_price_9, ask_price_0, ask_price_1
60        for (name, data_type) in field_data {
61            for i in 0..DEPTH10_LEN {
62                fields.push(Field::new(format!("{name}_{i}"), data_type.clone(), false));
63            }
64        }
65
66        fields.push(Field::new("flags", DataType::UInt8, false));
67        fields.push(Field::new("sequence", DataType::UInt64, false));
68        fields.push(Field::new("ts_event", DataType::UInt64, false));
69        fields.push(Field::new("ts_init", DataType::UInt64, false));
70
71        match metadata {
72            Some(metadata) => Schema::new_with_metadata(fields, metadata),
73            None => Schema::new(fields),
74        }
75    }
76}
77
78fn parse_metadata(
79    metadata: &HashMap<String, String>,
80) -> Result<(InstrumentId, u8, u8), EncodingError> {
81    let instrument_id_str = metadata
82        .get(KEY_INSTRUMENT_ID)
83        .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
84    let instrument_id = InstrumentId::from_str(instrument_id_str)
85        .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
86
87    let price_precision = metadata
88        .get(KEY_PRICE_PRECISION)
89        .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
90        .parse::<u8>()
91        .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
92
93    let size_precision = metadata
94        .get(KEY_SIZE_PRECISION)
95        .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
96        .parse::<u8>()
97        .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
98
99    Ok((instrument_id, price_precision, size_precision))
100}
101
102impl EncodeToRecordBatch for OrderBookDepth10 {
103    fn encode_batch(
104        metadata: &HashMap<String, String>,
105        data: &[Self],
106    ) -> Result<RecordBatch, ArrowError> {
107        let mut bid_price_builders = Vec::with_capacity(DEPTH10_LEN);
108        let mut ask_price_builders = Vec::with_capacity(DEPTH10_LEN);
109        let mut bid_size_builders = Vec::with_capacity(DEPTH10_LEN);
110        let mut ask_size_builders = Vec::with_capacity(DEPTH10_LEN);
111        let mut bid_count_builders = Vec::with_capacity(DEPTH10_LEN);
112        let mut ask_count_builders = Vec::with_capacity(DEPTH10_LEN);
113
114        for _ in 0..DEPTH10_LEN {
115            bid_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
116                data.len(),
117                PRECISION_BYTES,
118            ));
119            ask_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
120                data.len(),
121                PRECISION_BYTES,
122            ));
123            bid_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
124                data.len(),
125                PRECISION_BYTES,
126            ));
127            ask_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
128                data.len(),
129                PRECISION_BYTES,
130            ));
131            bid_count_builders.push(UInt32Array::builder(data.len()));
132            ask_count_builders.push(UInt32Array::builder(data.len()));
133        }
134
135        let mut flags_builder = UInt8Array::builder(data.len());
136        let mut sequence_builder = UInt64Array::builder(data.len());
137        let mut ts_event_builder = UInt64Array::builder(data.len());
138        let mut ts_init_builder = UInt64Array::builder(data.len());
139
140        for depth in data {
141            for i in 0..DEPTH10_LEN {
142                bid_price_builders[i]
143                    .append_value(depth.bids[i].price.raw.to_le_bytes())
144                    .unwrap();
145                ask_price_builders[i]
146                    .append_value(depth.asks[i].price.raw.to_le_bytes())
147                    .unwrap();
148                bid_size_builders[i]
149                    .append_value(depth.bids[i].size.raw.to_le_bytes())
150                    .unwrap();
151                ask_size_builders[i]
152                    .append_value(depth.asks[i].size.raw.to_le_bytes())
153                    .unwrap();
154                bid_count_builders[i].append_value(depth.bid_counts[i]);
155                ask_count_builders[i].append_value(depth.ask_counts[i]);
156            }
157
158            flags_builder.append_value(depth.flags);
159            sequence_builder.append_value(depth.sequence);
160            ts_event_builder.append_value(depth.ts_event.as_u64());
161            ts_init_builder.append_value(depth.ts_init.as_u64());
162        }
163
164        let bid_price_arrays = bid_price_builders
165            .into_iter()
166            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
167            .collect::<Vec<_>>();
168        let ask_price_arrays = ask_price_builders
169            .into_iter()
170            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
171            .collect::<Vec<_>>();
172        let bid_size_arrays = bid_size_builders
173            .into_iter()
174            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
175            .collect::<Vec<_>>();
176        let ask_size_arrays = ask_size_builders
177            .into_iter()
178            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
179            .collect::<Vec<_>>();
180        let bid_count_arrays = bid_count_builders
181            .into_iter()
182            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
183            .collect::<Vec<_>>();
184        let ask_count_arrays = ask_count_builders
185            .into_iter()
186            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
187            .collect::<Vec<_>>();
188
189        let flags_array = Arc::new(flags_builder.finish()) as Arc<dyn Array>;
190        let sequence_array = Arc::new(sequence_builder.finish()) as Arc<dyn Array>;
191        let ts_event_array = Arc::new(ts_event_builder.finish()) as Arc<dyn Array>;
192        let ts_init_array = Arc::new(ts_init_builder.finish()) as Arc<dyn Array>;
193
194        let mut columns = Vec::new();
195        columns.extend(bid_price_arrays);
196        columns.extend(ask_price_arrays);
197        columns.extend(bid_size_arrays);
198        columns.extend(ask_size_arrays);
199        columns.extend(bid_count_arrays);
200        columns.extend(ask_count_arrays);
201        columns.push(flags_array);
202        columns.push(sequence_array);
203        columns.push(ts_event_array);
204        columns.push(ts_init_array);
205
206        RecordBatch::try_new(Self::get_schema(Some(metadata.clone())).into(), columns)
207    }
208
209    fn metadata(&self) -> HashMap<String, String> {
210        Self::get_metadata(
211            &self.instrument_id,
212            self.bids[0].price.precision,
213            self.bids[0].size.precision,
214        )
215    }
216}
217
218impl DecodeFromRecordBatch for OrderBookDepth10 {
219    fn decode_batch(
220        metadata: &HashMap<String, String>,
221        record_batch: RecordBatch,
222    ) -> Result<Vec<Self>, EncodingError> {
223        let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
224        let cols = record_batch.columns();
225
226        let mut bid_prices = Vec::with_capacity(DEPTH10_LEN);
227        let mut ask_prices = Vec::with_capacity(DEPTH10_LEN);
228        let mut bid_sizes = Vec::with_capacity(DEPTH10_LEN);
229        let mut ask_sizes = Vec::with_capacity(DEPTH10_LEN);
230        let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
231        let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
232
233        macro_rules! extract_depth_column {
234            ($array:ty, $name:literal, $i:expr, $offset:expr, $type:expr) => {
235                extract_column::<$array>(cols, concat!($name, "_", stringify!($i)), $offset, $type)?
236            };
237        }
238
239        for i in 0..DEPTH10_LEN {
240            bid_prices.push(extract_depth_column!(
241                FixedSizeBinaryArray,
242                "bid_price",
243                i,
244                i,
245                DataType::FixedSizeBinary(PRECISION_BYTES)
246            ));
247            ask_prices.push(extract_depth_column!(
248                FixedSizeBinaryArray,
249                "ask_price",
250                i,
251                DEPTH10_LEN + i,
252                DataType::FixedSizeBinary(PRECISION_BYTES)
253            ));
254            bid_sizes.push(extract_depth_column!(
255                FixedSizeBinaryArray,
256                "bid_size",
257                i,
258                2 * DEPTH10_LEN + i,
259                DataType::FixedSizeBinary(PRECISION_BYTES)
260            ));
261            ask_sizes.push(extract_depth_column!(
262                FixedSizeBinaryArray,
263                "ask_size",
264                i,
265                3 * DEPTH10_LEN + i,
266                DataType::FixedSizeBinary(PRECISION_BYTES)
267            ));
268            bid_counts.push(extract_depth_column!(
269                UInt32Array,
270                "bid_count",
271                i,
272                4 * DEPTH10_LEN + i,
273                DataType::UInt32
274            ));
275            ask_counts.push(extract_depth_column!(
276                UInt32Array,
277                "ask_count",
278                i,
279                5 * DEPTH10_LEN + i,
280                DataType::UInt32
281            ));
282        }
283
284        for i in 0..DEPTH10_LEN {
285            validate_precision_bytes(bid_prices[i], "bid_price")?;
286            validate_precision_bytes(ask_prices[i], "ask_price")?;
287            validate_precision_bytes(bid_sizes[i], "bid_size")?;
288            validate_precision_bytes(ask_sizes[i], "ask_size")?;
289        }
290
291        let flags = extract_column::<UInt8Array>(cols, "flags", 6 * DEPTH10_LEN, DataType::UInt8)?;
292        let sequence =
293            extract_column::<UInt64Array>(cols, "sequence", 6 * DEPTH10_LEN + 1, DataType::UInt64)?;
294        let ts_event =
295            extract_column::<UInt64Array>(cols, "ts_event", 6 * DEPTH10_LEN + 2, DataType::UInt64)?;
296        let ts_init =
297            extract_column::<UInt64Array>(cols, "ts_init", 6 * DEPTH10_LEN + 3, DataType::UInt64)?;
298
299        // Map record batch rows to vector of OrderBookDepth10
300        let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
301            .map(|row| {
302                let mut bids = [BookOrder::default(); DEPTH10_LEN];
303                let mut asks = [BookOrder::default(); DEPTH10_LEN];
304                let mut bid_count_arr = [0u32; DEPTH10_LEN];
305                let mut ask_count_arr = [0u32; DEPTH10_LEN];
306
307                for i in 0..DEPTH10_LEN {
308                    let bid_price =
309                        decode_price(bid_prices[i].value(row), price_precision, "bid_price", row)?;
310                    let bid_size =
311                        decode_quantity(bid_sizes[i].value(row), size_precision, "bid_size", row)?;
312                    bids[i] = BookOrder::new(OrderSide::Buy, bid_price, bid_size, 0);
313
314                    let ask_price =
315                        decode_price(ask_prices[i].value(row), price_precision, "ask_price", row)?;
316                    let ask_size =
317                        decode_quantity(ask_sizes[i].value(row), size_precision, "ask_size", row)?;
318                    asks[i] = BookOrder::new(OrderSide::Sell, ask_price, ask_size, 0);
319
320                    bid_count_arr[i] = bid_counts[i].value(row);
321                    ask_count_arr[i] = ask_counts[i].value(row);
322                }
323
324                Ok(Self {
325                    instrument_id,
326                    bids,
327                    asks,
328                    bid_counts: bid_count_arr,
329                    ask_counts: ask_count_arr,
330                    flags: flags.value(row),
331                    sequence: sequence.value(row),
332                    ts_event: ts_event.value(row).into(),
333                    ts_init: ts_init.value(row).into(),
334                })
335            })
336            .collect();
337
338        result
339    }
340}
341
342impl DecodeDataFromRecordBatch for OrderBookDepth10 {
343    fn decode_data_batch(
344        metadata: &HashMap<String, String>,
345        record_batch: RecordBatch,
346    ) -> Result<Vec<Data>, EncodingError> {
347        let depths: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
348        Ok(depths.into_iter().map(Data::from).collect())
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use arrow::datatypes::{DataType, Field};
355    use nautilus_model::{
356        data::stubs::stub_depth10,
357        types::{Price, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
358    };
359    use pretty_assertions::assert_eq;
360    use rstest::rstest;
361
362    use super::*;
363    use crate::arrow::{get_raw_price, get_raw_quantity};
364
365    #[rstest]
366    fn test_get_schema() {
367        let instrument_id = InstrumentId::from("AAPL.XNAS");
368        let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
369        let schema = OrderBookDepth10::get_schema(Some(metadata));
370
371        let mut group_count = 0;
372        let field_data = get_field_data();
373        for (name, data_type) in field_data {
374            for i in 0..DEPTH10_LEN {
375                let field = schema.field(i + group_count * DEPTH10_LEN).clone();
376                assert_eq!(
377                    field,
378                    Field::new(format!("{name}_{i}"), data_type.clone(), false)
379                );
380            }
381
382            group_count += 1;
383        }
384
385        let flags_field = schema.field(group_count * DEPTH10_LEN).clone();
386        assert_eq!(flags_field, Field::new("flags", DataType::UInt8, false));
387        let sequence_field = schema.field(group_count * DEPTH10_LEN + 1).clone();
388        assert_eq!(
389            sequence_field,
390            Field::new("sequence", DataType::UInt64, false)
391        );
392        let ts_event_field = schema.field(group_count * DEPTH10_LEN + 2).clone();
393        assert_eq!(
394            ts_event_field,
395            Field::new("ts_event", DataType::UInt64, false)
396        );
397        let ts_init_field = schema.field(group_count * DEPTH10_LEN + 3).clone();
398        assert_eq!(
399            ts_init_field,
400            Field::new("ts_init", DataType::UInt64, false)
401        );
402
403        assert_eq!(schema.metadata()["instrument_id"], "AAPL.XNAS");
404        assert_eq!(schema.metadata()["price_precision"], "2");
405        assert_eq!(schema.metadata()["size_precision"], "0");
406    }
407
408    #[rstest]
409    fn test_get_schema_map() {
410        let schema_map = OrderBookDepth10::get_schema_map();
411
412        let field_data = get_field_data();
413        for (name, data_type) in field_data {
414            for i in 0..DEPTH10_LEN {
415                let field = schema_map.get(&format!("{name}_{i}")).map(String::as_str);
416                assert_eq!(field, Some(format!("{data_type:?}").as_str()));
417            }
418        }
419
420        assert_eq!(schema_map.get("flags").map(String::as_str), Some("UInt8"));
421        assert_eq!(
422            schema_map.get("sequence").map(String::as_str),
423            Some("UInt64")
424        );
425        assert_eq!(
426            schema_map.get("ts_event").map(String::as_str),
427            Some("UInt64")
428        );
429        assert_eq!(
430            schema_map.get("ts_init").map(String::as_str),
431            Some("UInt64")
432        );
433    }
434
435    #[rstest]
436    fn test_encode_batch(stub_depth10: OrderBookDepth10) {
437        let instrument_id = InstrumentId::from("AAPL.XNAS");
438        let price_precision = 2;
439        let metadata = OrderBookDepth10::get_metadata(&instrument_id, price_precision, 0);
440
441        let data = vec![stub_depth10];
442        let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
443        let columns = record_batch.columns();
444
445        assert_eq!(columns.len(), DEPTH10_LEN * 6 + 4);
446
447        // Extract and test bid prices
448        let bid_prices: Vec<_> = (0..DEPTH10_LEN)
449            .map(|i| {
450                columns[i]
451                    .as_any()
452                    .downcast_ref::<FixedSizeBinaryArray>()
453                    .unwrap()
454            })
455            .collect();
456
457        let expected_bid_prices: Vec<f64> =
458            vec![99.0, 98.0, 97.0, 96.0, 95.0, 94.0, 93.0, 92.0, 91.0, 90.0];
459
460        for (i, bid_price) in bid_prices.iter().enumerate() {
461            assert_eq!(bid_price.len(), 1);
462            assert_eq!(
463                get_raw_price(bid_price.value(0)),
464                (expected_bid_prices[i] * FIXED_SCALAR) as PriceRaw
465            );
466            assert_eq!(
467                Price::from_raw(get_raw_price(bid_price.value(0)), price_precision).as_f64(),
468                expected_bid_prices[i]
469            );
470        }
471
472        // Extract and test ask prices
473        let ask_prices: Vec<_> = (0..DEPTH10_LEN)
474            .map(|i| {
475                columns[DEPTH10_LEN + i]
476                    .as_any()
477                    .downcast_ref::<FixedSizeBinaryArray>()
478                    .unwrap()
479            })
480            .collect();
481
482        let expected_ask_prices: Vec<f64> = vec![
483            100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0,
484        ];
485
486        for (i, ask_price) in ask_prices.iter().enumerate() {
487            assert_eq!(ask_price.len(), 1);
488            assert_eq!(
489                get_raw_price(ask_price.value(0)),
490                (expected_ask_prices[i] * FIXED_SCALAR) as PriceRaw
491            );
492            assert_eq!(
493                Price::from_raw(get_raw_price(ask_price.value(0)), price_precision).as_f64(),
494                expected_ask_prices[i]
495            );
496        }
497
498        // Extract and test bid sizes
499        let bid_sizes: Vec<_> = (0..DEPTH10_LEN)
500            .map(|i| {
501                columns[2 * DEPTH10_LEN + i]
502                    .as_any()
503                    .downcast_ref::<FixedSizeBinaryArray>()
504                    .unwrap()
505            })
506            .collect();
507
508        for (i, bid_size) in bid_sizes.iter().enumerate() {
509            assert_eq!(bid_size.len(), 1);
510            assert_eq!(
511                get_raw_quantity(bid_size.value(0)),
512                ((100.0 * FIXED_SCALAR * (i + 1) as f64) as QuantityRaw)
513            );
514        }
515
516        // Extract and test ask sizes
517        let ask_sizes: Vec<_> = (0..DEPTH10_LEN)
518            .map(|i| {
519                columns[3 * DEPTH10_LEN + i]
520                    .as_any()
521                    .downcast_ref::<FixedSizeBinaryArray>()
522                    .unwrap()
523            })
524            .collect();
525
526        for (i, ask_size) in ask_sizes.iter().enumerate() {
527            assert_eq!(ask_size.len(), 1);
528            assert_eq!(
529                get_raw_quantity(ask_size.value(0)),
530                ((100.0 * FIXED_SCALAR * ((i + 1) as f64)) as QuantityRaw)
531            );
532        }
533
534        // Extract and test bid counts
535        let bid_counts: Vec<_> = (0..DEPTH10_LEN)
536            .map(|i| {
537                columns[4 * DEPTH10_LEN + i]
538                    .as_any()
539                    .downcast_ref::<UInt32Array>()
540                    .unwrap()
541            })
542            .collect();
543
544        for count_values in bid_counts {
545            assert_eq!(count_values.len(), 1);
546            assert_eq!(count_values.value(0), 1);
547        }
548
549        // Extract and test ask counts
550        let ask_counts: Vec<_> = (0..DEPTH10_LEN)
551            .map(|i| {
552                columns[5 * DEPTH10_LEN + i]
553                    .as_any()
554                    .downcast_ref::<UInt32Array>()
555                    .unwrap()
556            })
557            .collect();
558
559        for count_values in ask_counts {
560            assert_eq!(count_values.len(), 1);
561            assert_eq!(count_values.value(0), 1);
562        }
563
564        // Test remaining fields
565        let flags_values = columns[6 * DEPTH10_LEN]
566            .as_any()
567            .downcast_ref::<UInt8Array>()
568            .unwrap();
569        let sequence_values = columns[6 * DEPTH10_LEN + 1]
570            .as_any()
571            .downcast_ref::<UInt64Array>()
572            .unwrap();
573        let ts_event_values = columns[6 * DEPTH10_LEN + 2]
574            .as_any()
575            .downcast_ref::<UInt64Array>()
576            .unwrap();
577        let ts_init_values = columns[6 * DEPTH10_LEN + 3]
578            .as_any()
579            .downcast_ref::<UInt64Array>()
580            .unwrap();
581
582        assert_eq!(flags_values.len(), 1);
583        assert_eq!(flags_values.value(0), 0);
584        assert_eq!(sequence_values.len(), 1);
585        assert_eq!(sequence_values.value(0), 0);
586        assert_eq!(ts_event_values.len(), 1);
587        assert_eq!(ts_event_values.value(0), 1);
588        assert_eq!(ts_init_values.len(), 1);
589        assert_eq!(ts_init_values.value(0), 2);
590    }
591
592    #[rstest]
593    fn test_decode_batch(stub_depth10: OrderBookDepth10) {
594        let instrument_id = InstrumentId::from("AAPL.XNAS");
595        let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
596
597        let data = vec![stub_depth10];
598        let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
599        let decoded_data = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
600
601        assert_eq!(decoded_data.len(), 1);
602    }
603
604    #[rstest]
605    fn test_decode_batch_missing_instrument_id_returns_error(stub_depth10: OrderBookDepth10) {
606        let instrument_id = InstrumentId::from("AAPL.XNAS");
607        let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
608        let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
609
610        metadata.remove(KEY_INSTRUMENT_ID);
611
612        let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
613        assert!(result.is_err());
614        let err = result.unwrap_err();
615        assert!(
616            err.to_string().contains("instrument_id"),
617            "Expected missing instrument_id error, was: {err}"
618        );
619    }
620
621    #[rstest]
622    fn test_decode_batch_missing_price_precision_returns_error(stub_depth10: OrderBookDepth10) {
623        let instrument_id = InstrumentId::from("AAPL.XNAS");
624        let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
625        let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
626
627        metadata.remove(KEY_PRICE_PRECISION);
628
629        let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
630        assert!(result.is_err());
631        let err = result.unwrap_err();
632        assert!(
633            err.to_string().contains("price_precision"),
634            "Expected missing price_precision error, was: {err}"
635        );
636    }
637
638    #[rstest]
639    fn test_encode_decode_round_trip(stub_depth10: OrderBookDepth10) {
640        let instrument_id = InstrumentId::from("AAPL.XNAS");
641        let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
642
643        let original = vec![stub_depth10];
644        let record_batch = OrderBookDepth10::encode_batch(&metadata, &original).unwrap();
645        let decoded = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
646
647        assert_eq!(decoded.len(), original.len());
648        let orig = &original[0];
649        let dec = &decoded[0];
650
651        assert_eq!(dec.instrument_id, orig.instrument_id);
652        assert_eq!(dec.flags, orig.flags);
653        assert_eq!(dec.sequence, orig.sequence);
654        assert_eq!(dec.ts_event, orig.ts_event);
655        assert_eq!(dec.ts_init, orig.ts_init);
656
657        for i in 0..DEPTH10_LEN {
658            assert_eq!(
659                dec.bids[i].price, orig.bids[i].price,
660                "bid price mismatch at level {i}"
661            );
662            assert_eq!(
663                dec.bids[i].size, orig.bids[i].size,
664                "bid size mismatch at level {i}"
665            );
666            assert_eq!(
667                dec.asks[i].price, orig.asks[i].price,
668                "ask price mismatch at level {i}"
669            );
670            assert_eq!(
671                dec.asks[i].size, orig.asks[i].size,
672                "ask size mismatch at level {i}"
673            );
674        }
675    }
676}