nautilus_serialization/arrow/
depth.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19    array::{
20        Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder, UInt8Array, UInt32Array, UInt64Array,
21    },
22    datatypes::{DataType, Field, Schema},
23    error::ArrowError,
24    record_batch::RecordBatch,
25};
26use nautilus_model::{
27    data::{
28        depth::{DEPTH10_LEN, OrderBookDepth10},
29        order::BookOrder,
30    },
31    enums::OrderSide,
32    identifiers::InstrumentId,
33    types::fixed::PRECISION_BYTES,
34};
35
36use super::{
37    DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
38    KEY_SIZE_PRECISION, decode_price, decode_quantity, extract_column,
39};
40use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
41
42fn get_field_data() -> Vec<(&'static str, DataType)> {
43    vec![
44        ("bid_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
45        ("ask_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
46        ("bid_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
47        ("ask_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
48        ("bid_count", DataType::UInt32),
49        ("ask_count", DataType::UInt32),
50    ]
51}
52
53impl ArrowSchemaProvider for OrderBookDepth10 {
54    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
55        let mut fields = Vec::new();
56        let field_data = get_field_data();
57
58        // Schema is of the form:
59        // bid_price_0, bid_price_1, ..., bid_price_9, ask_price_0, ask_price_1
60        for (name, data_type) in field_data {
61            for i in 0..DEPTH10_LEN {
62                fields.push(Field::new(format!("{name}_{i}"), data_type.clone(), false));
63            }
64        }
65
66        fields.push(Field::new("flags", DataType::UInt8, false));
67        fields.push(Field::new("sequence", DataType::UInt64, false));
68        fields.push(Field::new("ts_event", DataType::UInt64, false));
69        fields.push(Field::new("ts_init", DataType::UInt64, false));
70
71        match metadata {
72            Some(metadata) => Schema::new_with_metadata(fields, metadata),
73            None => Schema::new(fields),
74        }
75    }
76}
77
78fn parse_metadata(
79    metadata: &HashMap<String, String>,
80) -> Result<(InstrumentId, u8, u8), EncodingError> {
81    let instrument_id_str = metadata
82        .get(KEY_INSTRUMENT_ID)
83        .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
84    let instrument_id = InstrumentId::from_str(instrument_id_str)
85        .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
86
87    let price_precision = metadata
88        .get(KEY_PRICE_PRECISION)
89        .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
90        .parse::<u8>()
91        .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
92
93    let size_precision = metadata
94        .get(KEY_SIZE_PRECISION)
95        .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
96        .parse::<u8>()
97        .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
98
99    Ok((instrument_id, price_precision, size_precision))
100}
101
102impl EncodeToRecordBatch for OrderBookDepth10 {
103    fn encode_batch(
104        metadata: &HashMap<String, String>,
105        data: &[Self],
106    ) -> Result<RecordBatch, ArrowError> {
107        let mut bid_price_builders = Vec::with_capacity(DEPTH10_LEN);
108        let mut ask_price_builders = Vec::with_capacity(DEPTH10_LEN);
109        let mut bid_size_builders = Vec::with_capacity(DEPTH10_LEN);
110        let mut ask_size_builders = Vec::with_capacity(DEPTH10_LEN);
111        let mut bid_count_builders = Vec::with_capacity(DEPTH10_LEN);
112        let mut ask_count_builders = Vec::with_capacity(DEPTH10_LEN);
113
114        for _ in 0..DEPTH10_LEN {
115            bid_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
116                data.len(),
117                PRECISION_BYTES,
118            ));
119            ask_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
120                data.len(),
121                PRECISION_BYTES,
122            ));
123            bid_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
124                data.len(),
125                PRECISION_BYTES,
126            ));
127            ask_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
128                data.len(),
129                PRECISION_BYTES,
130            ));
131            bid_count_builders.push(UInt32Array::builder(data.len()));
132            ask_count_builders.push(UInt32Array::builder(data.len()));
133        }
134
135        let mut flags_builder = UInt8Array::builder(data.len());
136        let mut sequence_builder = UInt64Array::builder(data.len());
137        let mut ts_event_builder = UInt64Array::builder(data.len());
138        let mut ts_init_builder = UInt64Array::builder(data.len());
139
140        for depth in data {
141            for i in 0..DEPTH10_LEN {
142                bid_price_builders[i]
143                    .append_value(depth.bids[i].price.raw.to_le_bytes())
144                    .unwrap();
145                ask_price_builders[i]
146                    .append_value(depth.asks[i].price.raw.to_le_bytes())
147                    .unwrap();
148                bid_size_builders[i]
149                    .append_value(depth.bids[i].size.raw.to_le_bytes())
150                    .unwrap();
151                ask_size_builders[i]
152                    .append_value(depth.asks[i].size.raw.to_le_bytes())
153                    .unwrap();
154                bid_count_builders[i].append_value(depth.bid_counts[i]);
155                ask_count_builders[i].append_value(depth.ask_counts[i]);
156            }
157
158            flags_builder.append_value(depth.flags);
159            sequence_builder.append_value(depth.sequence);
160            ts_event_builder.append_value(depth.ts_event.as_u64());
161            ts_init_builder.append_value(depth.ts_init.as_u64());
162        }
163
164        let bid_price_arrays = bid_price_builders
165            .into_iter()
166            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
167            .collect::<Vec<_>>();
168        let ask_price_arrays = ask_price_builders
169            .into_iter()
170            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
171            .collect::<Vec<_>>();
172        let bid_size_arrays = bid_size_builders
173            .into_iter()
174            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
175            .collect::<Vec<_>>();
176        let ask_size_arrays = ask_size_builders
177            .into_iter()
178            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
179            .collect::<Vec<_>>();
180        let bid_count_arrays = bid_count_builders
181            .into_iter()
182            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
183            .collect::<Vec<_>>();
184        let ask_count_arrays = ask_count_builders
185            .into_iter()
186            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
187            .collect::<Vec<_>>();
188
189        let flags_array = Arc::new(flags_builder.finish()) as Arc<dyn Array>;
190        let sequence_array = Arc::new(sequence_builder.finish()) as Arc<dyn Array>;
191        let ts_event_array = Arc::new(ts_event_builder.finish()) as Arc<dyn Array>;
192        let ts_init_array = Arc::new(ts_init_builder.finish()) as Arc<dyn Array>;
193
194        let mut columns = Vec::new();
195        columns.extend(bid_price_arrays);
196        columns.extend(ask_price_arrays);
197        columns.extend(bid_size_arrays);
198        columns.extend(ask_size_arrays);
199        columns.extend(bid_count_arrays);
200        columns.extend(ask_count_arrays);
201        columns.push(flags_array);
202        columns.push(sequence_array);
203        columns.push(ts_event_array);
204        columns.push(ts_init_array);
205
206        RecordBatch::try_new(Self::get_schema(Some(metadata.clone())).into(), columns)
207    }
208
209    fn metadata(&self) -> HashMap<String, String> {
210        Self::get_metadata(
211            &self.instrument_id,
212            self.bids[0].price.precision,
213            self.bids[0].size.precision,
214        )
215    }
216}
217
218impl DecodeFromRecordBatch for OrderBookDepth10 {
219    fn decode_batch(
220        metadata: &HashMap<String, String>,
221        record_batch: RecordBatch,
222    ) -> Result<Vec<Self>, EncodingError> {
223        let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
224        let cols = record_batch.columns();
225
226        let mut bid_prices = Vec::with_capacity(DEPTH10_LEN);
227        let mut ask_prices = Vec::with_capacity(DEPTH10_LEN);
228        let mut bid_sizes = Vec::with_capacity(DEPTH10_LEN);
229        let mut ask_sizes = Vec::with_capacity(DEPTH10_LEN);
230        let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
231        let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
232
233        macro_rules! extract_depth_column {
234            ($array:ty, $name:literal, $i:expr, $offset:expr, $type:expr) => {
235                extract_column::<$array>(cols, concat!($name, "_", stringify!($i)), $offset, $type)?
236            };
237        }
238
239        for i in 0..DEPTH10_LEN {
240            bid_prices.push(extract_depth_column!(
241                FixedSizeBinaryArray,
242                "bid_price",
243                i,
244                i,
245                DataType::FixedSizeBinary(PRECISION_BYTES)
246            ));
247            ask_prices.push(extract_depth_column!(
248                FixedSizeBinaryArray,
249                "ask_price",
250                i,
251                DEPTH10_LEN + i,
252                DataType::FixedSizeBinary(PRECISION_BYTES)
253            ));
254            bid_sizes.push(extract_depth_column!(
255                FixedSizeBinaryArray,
256                "bid_size",
257                i,
258                2 * DEPTH10_LEN + i,
259                DataType::FixedSizeBinary(PRECISION_BYTES)
260            ));
261            ask_sizes.push(extract_depth_column!(
262                FixedSizeBinaryArray,
263                "ask_size",
264                i,
265                3 * DEPTH10_LEN + i,
266                DataType::FixedSizeBinary(PRECISION_BYTES)
267            ));
268            bid_counts.push(extract_depth_column!(
269                UInt32Array,
270                "bid_count",
271                i,
272                4 * DEPTH10_LEN + i,
273                DataType::UInt32
274            ));
275            ask_counts.push(extract_depth_column!(
276                UInt32Array,
277                "ask_count",
278                i,
279                5 * DEPTH10_LEN + i,
280                DataType::UInt32
281            ));
282        }
283
284        for i in 0..DEPTH10_LEN {
285            if bid_prices[i].value_length() != PRECISION_BYTES {
286                return Err(EncodingError::ParseError(
287                    "bid_price",
288                    format!(
289                        "Invalid value length at index {i}: expected {PRECISION_BYTES}, found {}",
290                        bid_prices[i].value_length()
291                    ),
292                ));
293            }
294            if ask_prices[i].value_length() != PRECISION_BYTES {
295                return Err(EncodingError::ParseError(
296                    "ask_price",
297                    format!(
298                        "Invalid value length at index {i}: expected {PRECISION_BYTES}, found {}",
299                        ask_prices[i].value_length()
300                    ),
301                ));
302            }
303            if bid_sizes[i].value_length() != PRECISION_BYTES {
304                return Err(EncodingError::ParseError(
305                    "bid_size",
306                    format!(
307                        "Invalid value length at index {i}: expected {PRECISION_BYTES}, found {}",
308                        bid_sizes[i].value_length()
309                    ),
310                ));
311            }
312            if ask_sizes[i].value_length() != PRECISION_BYTES {
313                return Err(EncodingError::ParseError(
314                    "ask_size",
315                    format!(
316                        "Invalid value length at index {i}: expected {PRECISION_BYTES}, found {}",
317                        ask_sizes[i].value_length()
318                    ),
319                ));
320            }
321        }
322
323        let flags = extract_column::<UInt8Array>(cols, "flags", 6 * DEPTH10_LEN, DataType::UInt8)?;
324        let sequence =
325            extract_column::<UInt64Array>(cols, "sequence", 6 * DEPTH10_LEN + 1, DataType::UInt64)?;
326        let ts_event =
327            extract_column::<UInt64Array>(cols, "ts_event", 6 * DEPTH10_LEN + 2, DataType::UInt64)?;
328        let ts_init =
329            extract_column::<UInt64Array>(cols, "ts_init", 6 * DEPTH10_LEN + 3, DataType::UInt64)?;
330
331        // Map record batch rows to vector of OrderBookDepth10
332        let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
333            .map(|row| {
334                let mut bids = [BookOrder::default(); DEPTH10_LEN];
335                let mut asks = [BookOrder::default(); DEPTH10_LEN];
336                let mut bid_count_arr = [0u32; DEPTH10_LEN];
337                let mut ask_count_arr = [0u32; DEPTH10_LEN];
338
339                for i in 0..DEPTH10_LEN {
340                    let bid_price =
341                        decode_price(bid_prices[i].value(row), price_precision, "bid_price", row)?;
342                    let bid_size =
343                        decode_quantity(bid_sizes[i].value(row), size_precision, "bid_size", row)?;
344                    bids[i] = BookOrder::new(OrderSide::Buy, bid_price, bid_size, 0);
345
346                    let ask_price =
347                        decode_price(ask_prices[i].value(row), price_precision, "ask_price", row)?;
348                    let ask_size =
349                        decode_quantity(ask_sizes[i].value(row), size_precision, "ask_size", row)?;
350                    asks[i] = BookOrder::new(OrderSide::Sell, ask_price, ask_size, 0);
351
352                    bid_count_arr[i] = bid_counts[i].value(row);
353                    ask_count_arr[i] = ask_counts[i].value(row);
354                }
355
356                Ok(Self {
357                    instrument_id,
358                    bids,
359                    asks,
360                    bid_counts: bid_count_arr,
361                    ask_counts: ask_count_arr,
362                    flags: flags.value(row),
363                    sequence: sequence.value(row),
364                    ts_event: ts_event.value(row).into(),
365                    ts_init: ts_init.value(row).into(),
366                })
367            })
368            .collect();
369
370        result
371    }
372}
373
374impl DecodeDataFromRecordBatch for OrderBookDepth10 {
375    fn decode_data_batch(
376        metadata: &HashMap<String, String>,
377        record_batch: RecordBatch,
378    ) -> Result<Vec<Data>, EncodingError> {
379        let depths: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
380        Ok(depths.into_iter().map(Data::from).collect())
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use arrow::datatypes::{DataType, Field};
387    use nautilus_model::{
388        data::stubs::stub_depth10,
389        types::{Price, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
390    };
391    use pretty_assertions::assert_eq;
392    use rstest::rstest;
393
394    use super::*;
395    use crate::arrow::{get_raw_price, get_raw_quantity};
396
397    #[rstest]
398    fn test_get_schema() {
399        let instrument_id = InstrumentId::from("AAPL.XNAS");
400        let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
401        let schema = OrderBookDepth10::get_schema(Some(metadata));
402
403        let mut group_count = 0;
404        let field_data = get_field_data();
405        for (name, data_type) in field_data {
406            for i in 0..DEPTH10_LEN {
407                let field = schema.field(i + group_count * DEPTH10_LEN).clone();
408                assert_eq!(
409                    field,
410                    Field::new(format!("{name}_{i}"), data_type.clone(), false)
411                );
412            }
413
414            group_count += 1;
415        }
416
417        let flags_field = schema.field(group_count * DEPTH10_LEN).clone();
418        assert_eq!(flags_field, Field::new("flags", DataType::UInt8, false));
419        let sequence_field = schema.field(group_count * DEPTH10_LEN + 1).clone();
420        assert_eq!(
421            sequence_field,
422            Field::new("sequence", DataType::UInt64, false)
423        );
424        let ts_event_field = schema.field(group_count * DEPTH10_LEN + 2).clone();
425        assert_eq!(
426            ts_event_field,
427            Field::new("ts_event", DataType::UInt64, false)
428        );
429        let ts_init_field = schema.field(group_count * DEPTH10_LEN + 3).clone();
430        assert_eq!(
431            ts_init_field,
432            Field::new("ts_init", DataType::UInt64, false)
433        );
434
435        assert_eq!(schema.metadata()["instrument_id"], "AAPL.XNAS");
436        assert_eq!(schema.metadata()["price_precision"], "2");
437        assert_eq!(schema.metadata()["size_precision"], "0");
438    }
439
440    #[rstest]
441    fn test_get_schema_map() {
442        let schema_map = OrderBookDepth10::get_schema_map();
443
444        let field_data = get_field_data();
445        for (name, data_type) in field_data {
446            for i in 0..DEPTH10_LEN {
447                let field = schema_map.get(&format!("{name}_{i}")).map(String::as_str);
448                assert_eq!(field, Some(format!("{data_type:?}").as_str()));
449            }
450        }
451
452        assert_eq!(schema_map.get("flags").map(String::as_str), Some("UInt8"));
453        assert_eq!(
454            schema_map.get("sequence").map(String::as_str),
455            Some("UInt64")
456        );
457        assert_eq!(
458            schema_map.get("ts_event").map(String::as_str),
459            Some("UInt64")
460        );
461        assert_eq!(
462            schema_map.get("ts_init").map(String::as_str),
463            Some("UInt64")
464        );
465    }
466
467    #[rstest]
468    fn test_encode_batch(stub_depth10: OrderBookDepth10) {
469        let instrument_id = InstrumentId::from("AAPL.XNAS");
470        let price_precision = 2;
471        let metadata = OrderBookDepth10::get_metadata(&instrument_id, price_precision, 0);
472
473        let data = vec![stub_depth10];
474        let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
475        let columns = record_batch.columns();
476
477        assert_eq!(columns.len(), DEPTH10_LEN * 6 + 4);
478
479        // Extract and test bid prices
480        let bid_prices: Vec<_> = (0..DEPTH10_LEN)
481            .map(|i| {
482                columns[i]
483                    .as_any()
484                    .downcast_ref::<FixedSizeBinaryArray>()
485                    .unwrap()
486            })
487            .collect();
488
489        let expected_bid_prices: Vec<f64> =
490            vec![99.0, 98.0, 97.0, 96.0, 95.0, 94.0, 93.0, 92.0, 91.0, 90.0];
491
492        for (i, bid_price) in bid_prices.iter().enumerate() {
493            assert_eq!(bid_price.len(), 1);
494            assert_eq!(
495                get_raw_price(bid_price.value(0)),
496                (expected_bid_prices[i] * FIXED_SCALAR) as PriceRaw
497            );
498            assert_eq!(
499                Price::from_raw(get_raw_price(bid_price.value(0)), price_precision).as_f64(),
500                expected_bid_prices[i]
501            );
502        }
503
504        // Extract and test ask prices
505        let ask_prices: Vec<_> = (0..DEPTH10_LEN)
506            .map(|i| {
507                columns[DEPTH10_LEN + i]
508                    .as_any()
509                    .downcast_ref::<FixedSizeBinaryArray>()
510                    .unwrap()
511            })
512            .collect();
513
514        let expected_ask_prices: Vec<f64> = vec![
515            100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0,
516        ];
517
518        for (i, ask_price) in ask_prices.iter().enumerate() {
519            assert_eq!(ask_price.len(), 1);
520            assert_eq!(
521                get_raw_price(ask_price.value(0)),
522                (expected_ask_prices[i] * FIXED_SCALAR) as PriceRaw
523            );
524            assert_eq!(
525                Price::from_raw(get_raw_price(ask_price.value(0)), price_precision).as_f64(),
526                expected_ask_prices[i]
527            );
528        }
529
530        // Extract and test bid sizes
531        let bid_sizes: Vec<_> = (0..DEPTH10_LEN)
532            .map(|i| {
533                columns[2 * DEPTH10_LEN + i]
534                    .as_any()
535                    .downcast_ref::<FixedSizeBinaryArray>()
536                    .unwrap()
537            })
538            .collect();
539
540        for (i, bid_size) in bid_sizes.iter().enumerate() {
541            assert_eq!(bid_size.len(), 1);
542            assert_eq!(
543                get_raw_quantity(bid_size.value(0)),
544                ((100.0 * FIXED_SCALAR * (i + 1) as f64) as QuantityRaw)
545            );
546        }
547
548        // Extract and test ask sizes
549        let ask_sizes: Vec<_> = (0..DEPTH10_LEN)
550            .map(|i| {
551                columns[3 * DEPTH10_LEN + i]
552                    .as_any()
553                    .downcast_ref::<FixedSizeBinaryArray>()
554                    .unwrap()
555            })
556            .collect();
557
558        for (i, ask_size) in ask_sizes.iter().enumerate() {
559            assert_eq!(ask_size.len(), 1);
560            assert_eq!(
561                get_raw_quantity(ask_size.value(0)),
562                ((100.0 * FIXED_SCALAR * ((i + 1) as f64)) as QuantityRaw)
563            );
564        }
565
566        // Extract and test bid counts
567        let bid_counts: Vec<_> = (0..DEPTH10_LEN)
568            .map(|i| {
569                columns[4 * DEPTH10_LEN + i]
570                    .as_any()
571                    .downcast_ref::<UInt32Array>()
572                    .unwrap()
573            })
574            .collect();
575
576        for count_values in bid_counts {
577            assert_eq!(count_values.len(), 1);
578            assert_eq!(count_values.value(0), 1);
579        }
580
581        // Extract and test ask counts
582        let ask_counts: Vec<_> = (0..DEPTH10_LEN)
583            .map(|i| {
584                columns[5 * DEPTH10_LEN + i]
585                    .as_any()
586                    .downcast_ref::<UInt32Array>()
587                    .unwrap()
588            })
589            .collect();
590
591        for count_values in ask_counts {
592            assert_eq!(count_values.len(), 1);
593            assert_eq!(count_values.value(0), 1);
594        }
595
596        // Test remaining fields
597        let flags_values = columns[6 * DEPTH10_LEN]
598            .as_any()
599            .downcast_ref::<UInt8Array>()
600            .unwrap();
601        let sequence_values = columns[6 * DEPTH10_LEN + 1]
602            .as_any()
603            .downcast_ref::<UInt64Array>()
604            .unwrap();
605        let ts_event_values = columns[6 * DEPTH10_LEN + 2]
606            .as_any()
607            .downcast_ref::<UInt64Array>()
608            .unwrap();
609        let ts_init_values = columns[6 * DEPTH10_LEN + 3]
610            .as_any()
611            .downcast_ref::<UInt64Array>()
612            .unwrap();
613
614        assert_eq!(flags_values.len(), 1);
615        assert_eq!(flags_values.value(0), 0);
616        assert_eq!(sequence_values.len(), 1);
617        assert_eq!(sequence_values.value(0), 0);
618        assert_eq!(ts_event_values.len(), 1);
619        assert_eq!(ts_event_values.value(0), 1);
620        assert_eq!(ts_init_values.len(), 1);
621        assert_eq!(ts_init_values.value(0), 2);
622    }
623
624    #[rstest]
625    fn test_decode_batch(stub_depth10: OrderBookDepth10) {
626        let instrument_id = InstrumentId::from("AAPL.XNAS");
627        let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
628
629        let data = vec![stub_depth10];
630        let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
631        let decoded_data = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
632
633        assert_eq!(decoded_data.len(), 1);
634    }
635
636    #[rstest]
637    fn test_decode_batch_missing_instrument_id_returns_error(stub_depth10: OrderBookDepth10) {
638        let instrument_id = InstrumentId::from("AAPL.XNAS");
639        let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
640        let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
641
642        metadata.remove(KEY_INSTRUMENT_ID);
643
644        let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
645        assert!(result.is_err());
646        let err = result.unwrap_err();
647        assert!(
648            err.to_string().contains("instrument_id"),
649            "Expected missing instrument_id error, got: {err}"
650        );
651    }
652
653    #[rstest]
654    fn test_decode_batch_missing_price_precision_returns_error(stub_depth10: OrderBookDepth10) {
655        let instrument_id = InstrumentId::from("AAPL.XNAS");
656        let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
657        let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
658
659        metadata.remove(KEY_PRICE_PRECISION);
660
661        let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
662        assert!(result.is_err());
663        let err = result.unwrap_err();
664        assert!(
665            err.to_string().contains("price_precision"),
666            "Expected missing price_precision error, got: {err}"
667        );
668    }
669
670    #[rstest]
671    fn test_encode_decode_round_trip(stub_depth10: OrderBookDepth10) {
672        let instrument_id = InstrumentId::from("AAPL.XNAS");
673        let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
674
675        let original = vec![stub_depth10];
676        let record_batch = OrderBookDepth10::encode_batch(&metadata, &original).unwrap();
677        let decoded = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
678
679        assert_eq!(decoded.len(), original.len());
680        let orig = &original[0];
681        let dec = &decoded[0];
682
683        assert_eq!(dec.instrument_id, orig.instrument_id);
684        assert_eq!(dec.flags, orig.flags);
685        assert_eq!(dec.sequence, orig.sequence);
686        assert_eq!(dec.ts_event, orig.ts_event);
687        assert_eq!(dec.ts_init, orig.ts_init);
688
689        for i in 0..DEPTH10_LEN {
690            assert_eq!(
691                dec.bids[i].price, orig.bids[i].price,
692                "bid price mismatch at level {i}"
693            );
694            assert_eq!(
695                dec.bids[i].size, orig.bids[i].size,
696                "bid size mismatch at level {i}"
697            );
698            assert_eq!(
699                dec.asks[i].price, orig.asks[i].price,
700                "ask price mismatch at level {i}"
701            );
702            assert_eq!(
703                dec.asks[i].size, orig.asks[i].size,
704                "ask size mismatch at level {i}"
705            );
706        }
707    }
708}