Skip to main content

nautilus_serialization/arrow/
trade.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19    array::{
20        FixedSizeBinaryArray, FixedSizeBinaryBuilder, StringArray, StringBuilder, StringViewArray,
21        UInt8Array, UInt64Array,
22    },
23    datatypes::{DataType, Field, Schema},
24    error::ArrowError,
25    record_batch::RecordBatch,
26};
27use nautilus_model::{
28    data::TradeTick,
29    enums::AggressorSide,
30    identifiers::{InstrumentId, TradeId},
31    types::fixed::PRECISION_BYTES,
32};
33
34use super::{
35    DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
36    KEY_SIZE_PRECISION, decode_price, decode_quantity, extract_column, validate_precision_bytes,
37};
38use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
39
40impl ArrowSchemaProvider for TradeTick {
41    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
42        let fields = vec![
43            Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
44            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
45            Field::new("aggressor_side", DataType::UInt8, false),
46            Field::new("trade_id", DataType::Utf8, false),
47            Field::new("ts_event", DataType::UInt64, false),
48            Field::new("ts_init", DataType::UInt64, false),
49        ];
50
51        match metadata {
52            Some(metadata) => Schema::new_with_metadata(fields, metadata),
53            None => Schema::new(fields),
54        }
55    }
56}
57
58fn parse_metadata(
59    metadata: &HashMap<String, String>,
60) -> Result<(InstrumentId, u8, u8), EncodingError> {
61    let instrument_id_str = metadata
62        .get(KEY_INSTRUMENT_ID)
63        .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
64    let instrument_id = InstrumentId::from_str(instrument_id_str)
65        .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
66
67    let price_precision = metadata
68        .get(KEY_PRICE_PRECISION)
69        .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
70        .parse::<u8>()
71        .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
72
73    let size_precision = metadata
74        .get(KEY_SIZE_PRECISION)
75        .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
76        .parse::<u8>()
77        .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
78
79    Ok((instrument_id, price_precision, size_precision))
80}
81
82impl EncodeToRecordBatch for TradeTick {
83    fn encode_batch(
84        metadata: &HashMap<String, String>,
85        data: &[Self],
86    ) -> Result<RecordBatch, ArrowError> {
87        let mut price_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
88        let mut size_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
89
90        let mut aggressor_side_builder = UInt8Array::builder(data.len());
91        let mut trade_id_builder = StringBuilder::new();
92        let mut ts_event_builder = UInt64Array::builder(data.len());
93        let mut ts_init_builder = UInt64Array::builder(data.len());
94
95        for tick in data {
96            price_builder
97                .append_value(tick.price.raw.to_le_bytes())
98                .unwrap();
99            size_builder
100                .append_value(tick.size.raw.to_le_bytes())
101                .unwrap();
102            aggressor_side_builder.append_value(tick.aggressor_side as u8);
103            trade_id_builder.append_value(tick.trade_id.to_string());
104            ts_event_builder.append_value(tick.ts_event.as_u64());
105            ts_init_builder.append_value(tick.ts_init.as_u64());
106        }
107
108        let price_array = Arc::new(price_builder.finish());
109        let size_array = Arc::new(size_builder.finish());
110        let aggressor_side_array = Arc::new(aggressor_side_builder.finish());
111        let trade_id_array = Arc::new(trade_id_builder.finish());
112        let ts_event_array = Arc::new(ts_event_builder.finish());
113        let ts_init_array = Arc::new(ts_init_builder.finish());
114
115        RecordBatch::try_new(
116            Self::get_schema(Some(metadata.clone())).into(),
117            vec![
118                price_array,
119                size_array,
120                aggressor_side_array,
121                trade_id_array,
122                ts_event_array,
123                ts_init_array,
124            ],
125        )
126    }
127
128    fn metadata(&self) -> HashMap<String, String> {
129        Self::get_metadata(
130            &self.instrument_id,
131            self.price.precision,
132            self.size.precision,
133        )
134    }
135}
136
137impl DecodeFromRecordBatch for TradeTick {
138    fn decode_batch(
139        metadata: &HashMap<String, String>,
140        record_batch: RecordBatch,
141    ) -> Result<Vec<Self>, EncodingError> {
142        let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
143        let cols = record_batch.columns();
144
145        let price_values = extract_column::<FixedSizeBinaryArray>(
146            cols,
147            "price",
148            0,
149            DataType::FixedSizeBinary(PRECISION_BYTES),
150        )?;
151
152        let size_values = extract_column::<FixedSizeBinaryArray>(
153            cols,
154            "size",
155            1,
156            DataType::FixedSizeBinary(PRECISION_BYTES),
157        )?;
158
159        validate_precision_bytes(price_values, "price")?;
160        validate_precision_bytes(size_values, "size")?;
161
162        let aggressor_side_values =
163            extract_column::<UInt8Array>(cols, "aggressor_side", 2, DataType::UInt8)?;
164        let ts_event_values = extract_column::<UInt64Array>(cols, "ts_event", 4, DataType::UInt64)?;
165        let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 5, DataType::UInt64)?;
166
167        // Datafusion reads trade_ids as StringView
168        let trade_id_values: Vec<TradeId> = if record_batch
169            .schema()
170            .field_with_name("trade_id")?
171            .data_type()
172            == &DataType::Utf8View
173        {
174            extract_column::<StringViewArray>(cols, "trade_id", 3, DataType::Utf8View)?
175                .iter()
176                .enumerate()
177                .map(|(i, id)| {
178                    id.map(TradeId::from).ok_or_else(|| {
179                        EncodingError::ParseError("trade_id", format!("NULL value at row {i}"))
180                    })
181                })
182                .collect::<Result<Vec<_>, _>>()?
183        } else {
184            extract_column::<StringArray>(cols, "trade_id", 3, DataType::Utf8)?
185                .iter()
186                .enumerate()
187                .map(|(i, id)| {
188                    id.map(TradeId::from).ok_or_else(|| {
189                        EncodingError::ParseError("trade_id", format!("NULL value at row {i}"))
190                    })
191                })
192                .collect::<Result<Vec<_>, _>>()?
193        };
194
195        let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
196            .map(|i| {
197                let price = decode_price(price_values.value(i), price_precision, "price", i)?;
198                let size = decode_quantity(size_values.value(i), size_precision, "size", i)?;
199                let aggressor_side_value = aggressor_side_values.value(i);
200                let aggressor_side = AggressorSide::from_repr(aggressor_side_value as usize)
201                    .ok_or_else(|| {
202                        EncodingError::ParseError(
203                            stringify!(AggressorSide),
204                            format!("Invalid enum value, was {aggressor_side_value}"),
205                        )
206                    })?;
207                let trade_id = trade_id_values[i];
208                let ts_event = ts_event_values.value(i).into();
209                let ts_init = ts_init_values.value(i).into();
210
211                Ok(Self {
212                    instrument_id,
213                    price,
214                    size,
215                    aggressor_side,
216                    trade_id,
217                    ts_event,
218                    ts_init,
219                })
220            })
221            .collect();
222
223        result
224    }
225}
226
227impl DecodeDataFromRecordBatch for TradeTick {
228    fn decode_data_batch(
229        metadata: &HashMap<String, String>,
230        record_batch: RecordBatch,
231    ) -> Result<Vec<Data>, EncodingError> {
232        let ticks: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
233        Ok(ticks.into_iter().map(Data::from).collect())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use std::sync::Arc;
240
241    use arrow::{
242        array::{Array, FixedSizeBinaryArray, UInt8Array, UInt64Array},
243        record_batch::RecordBatch,
244    };
245    use nautilus_model::types::{
246        Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw,
247    };
248    use rstest::rstest;
249
250    use super::*;
251    use crate::arrow::{get_raw_price, get_raw_quantity};
252
253    #[rstest]
254    fn test_get_schema() {
255        let instrument_id = InstrumentId::from("AAPL.XNAS");
256        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
257        let schema = TradeTick::get_schema(Some(metadata.clone()));
258
259        let mut expected_fields = Vec::with_capacity(6);
260
261        expected_fields.push(Field::new(
262            "price",
263            DataType::FixedSizeBinary(PRECISION_BYTES),
264            false,
265        ));
266
267        expected_fields.extend(vec![
268            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
269            Field::new("aggressor_side", DataType::UInt8, false),
270            Field::new("trade_id", DataType::Utf8, false),
271            Field::new("ts_event", DataType::UInt64, false),
272            Field::new("ts_init", DataType::UInt64, false),
273        ]);
274
275        let expected_schema = Schema::new_with_metadata(expected_fields, metadata);
276        assert_eq!(schema, expected_schema);
277    }
278
279    #[rstest]
280    fn test_get_schema_map() {
281        let schema_map = TradeTick::get_schema_map();
282        let mut expected_map = HashMap::new();
283
284        let precision_bytes = format!("FixedSizeBinary({PRECISION_BYTES})");
285        expected_map.insert("price".to_string(), precision_bytes.clone());
286        expected_map.insert("size".to_string(), precision_bytes);
287        expected_map.insert("aggressor_side".to_string(), "UInt8".to_string());
288        expected_map.insert("trade_id".to_string(), "Utf8".to_string());
289        expected_map.insert("ts_event".to_string(), "UInt64".to_string());
290        expected_map.insert("ts_init".to_string(), "UInt64".to_string());
291        assert_eq!(schema_map, expected_map);
292    }
293
294    #[rstest]
295    fn test_encode_trade_tick() {
296        let instrument_id = InstrumentId::from("AAPL.XNAS");
297        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
298
299        let tick1 = TradeTick {
300            instrument_id,
301            price: Price::from("100.10"),
302            size: Quantity::from(1000),
303            aggressor_side: AggressorSide::Buyer,
304            trade_id: TradeId::new("1"),
305            ts_event: 1.into(),
306            ts_init: 3.into(),
307        };
308
309        let tick2 = TradeTick {
310            instrument_id,
311            price: Price::from("100.50"),
312            size: Quantity::from(500),
313            aggressor_side: AggressorSide::Seller,
314            trade_id: TradeId::new("2"),
315            ts_event: 2.into(),
316            ts_init: 4.into(),
317        };
318
319        let data = vec![tick1, tick2];
320        let record_batch = TradeTick::encode_batch(&metadata, &data).unwrap();
321        let columns = record_batch.columns();
322
323        let price_values = columns[0]
324            .as_any()
325            .downcast_ref::<FixedSizeBinaryArray>()
326            .unwrap();
327        assert_eq!(
328            get_raw_price(price_values.value(0)),
329            (100.10 * FIXED_SCALAR) as PriceRaw
330        );
331        assert_eq!(
332            get_raw_price(price_values.value(1)),
333            (100.50 * FIXED_SCALAR) as PriceRaw
334        );
335
336        let size_values = columns[1]
337            .as_any()
338            .downcast_ref::<FixedSizeBinaryArray>()
339            .unwrap();
340        assert_eq!(
341            get_raw_quantity(size_values.value(0)),
342            (1000.0 * FIXED_SCALAR) as QuantityRaw
343        );
344        assert_eq!(
345            get_raw_quantity(size_values.value(1)),
346            (500.0 * FIXED_SCALAR) as QuantityRaw
347        );
348
349        let aggressor_side_values = columns[2].as_any().downcast_ref::<UInt8Array>().unwrap();
350        let trade_id_values = columns[3].as_any().downcast_ref::<StringArray>().unwrap();
351        let ts_event_values = columns[4].as_any().downcast_ref::<UInt64Array>().unwrap();
352        let ts_init_values = columns[5].as_any().downcast_ref::<UInt64Array>().unwrap();
353
354        assert_eq!(columns.len(), 6);
355        assert_eq!(size_values.len(), 2);
356        assert_eq!(
357            get_raw_quantity(size_values.value(0)),
358            (1000.0 * FIXED_SCALAR) as QuantityRaw
359        );
360        assert_eq!(
361            get_raw_quantity(size_values.value(1)),
362            (500.0 * FIXED_SCALAR) as QuantityRaw
363        );
364        assert_eq!(aggressor_side_values.len(), 2);
365        assert_eq!(aggressor_side_values.value(0), 1);
366        assert_eq!(aggressor_side_values.value(1), 2);
367        assert_eq!(trade_id_values.len(), 2);
368        assert_eq!(trade_id_values.value(0), "1");
369        assert_eq!(trade_id_values.value(1), "2");
370        assert_eq!(ts_event_values.len(), 2);
371        assert_eq!(ts_event_values.value(0), 1);
372        assert_eq!(ts_event_values.value(1), 2);
373        assert_eq!(ts_init_values.len(), 2);
374        assert_eq!(ts_init_values.value(0), 3);
375        assert_eq!(ts_init_values.value(1), 4);
376    }
377
378    #[rstest]
379    fn test_decode_batch() {
380        let instrument_id = InstrumentId::from("AAPL.XNAS");
381        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
382
383        let raw_price1 = (100.00 * FIXED_SCALAR) as PriceRaw;
384        let raw_price2 = (101.00 * FIXED_SCALAR) as PriceRaw;
385        let price =
386            FixedSizeBinaryArray::from(vec![&raw_price1.to_le_bytes(), &raw_price2.to_le_bytes()]);
387
388        let size = FixedSizeBinaryArray::from(vec![
389            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
390            &((900.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
391        ]);
392        let aggressor_side = UInt8Array::from(vec![0, 1]); // 0 for BUY, 1 for SELL
393        let trade_id = StringArray::from(vec!["1", "2"]);
394        let ts_event = UInt64Array::from(vec![1, 2]);
395        let ts_init = UInt64Array::from(vec![3, 4]);
396
397        let record_batch = RecordBatch::try_new(
398            TradeTick::get_schema(Some(metadata.clone())).into(),
399            vec![
400                Arc::new(price),
401                Arc::new(size),
402                Arc::new(aggressor_side),
403                Arc::new(trade_id),
404                Arc::new(ts_event),
405                Arc::new(ts_init),
406            ],
407        )
408        .unwrap();
409
410        let decoded_data = TradeTick::decode_batch(&metadata, record_batch).unwrap();
411        assert_eq!(decoded_data.len(), 2);
412        assert_eq!(decoded_data[0].price, Price::from_raw(raw_price1, 2));
413        assert_eq!(decoded_data[1].price, Price::from_raw(raw_price2, 2));
414    }
415
416    #[rstest]
417    fn test_decode_batch_null_trade_id_returns_error() {
418        use arrow::datatypes::Field;
419
420        let instrument_id = InstrumentId::from("AAPL.XNAS");
421        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
422
423        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
424        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
425        let size = FixedSizeBinaryArray::from(vec![
426            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
427        ]);
428        let aggressor_side = UInt8Array::from(vec![0]);
429
430        let trade_id: StringArray = vec![None::<&str>].into();
431        let ts_event = UInt64Array::from(vec![1]);
432        let ts_init = UInt64Array::from(vec![2]);
433
434        // Create schema with nullable trade_id to simulate external data source
435        let fields = vec![
436            Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
437            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
438            Field::new("aggressor_side", DataType::UInt8, false),
439            Field::new("trade_id", DataType::Utf8, true), // nullable
440            Field::new("ts_event", DataType::UInt64, false),
441            Field::new("ts_init", DataType::UInt64, false),
442        ];
443        let schema = Schema::new_with_metadata(fields, metadata.clone());
444
445        let record_batch = RecordBatch::try_new(
446            schema.into(),
447            vec![
448                Arc::new(price),
449                Arc::new(size),
450                Arc::new(aggressor_side),
451                Arc::new(trade_id),
452                Arc::new(ts_event),
453                Arc::new(ts_init),
454            ],
455        )
456        .unwrap();
457
458        let result = TradeTick::decode_batch(&metadata, record_batch);
459        assert!(result.is_err());
460        let err = result.unwrap_err();
461        assert!(
462            err.to_string().contains("NULL value at row 0"),
463            "Expected NULL error, was: {err}"
464        );
465    }
466
467    #[rstest]
468    fn test_decode_batch_invalid_price_returns_error() {
469        let instrument_id = InstrumentId::from("AAPL.XNAS");
470        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
471
472        let invalid_price: PriceRaw = PriceRaw::MAX - 1000;
473        let price = FixedSizeBinaryArray::from(vec![&invalid_price.to_le_bytes()]);
474        let size = FixedSizeBinaryArray::from(vec![
475            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
476        ]);
477        let aggressor_side = UInt8Array::from(vec![0]);
478        let trade_id = StringArray::from(vec!["1"]);
479        let ts_event = UInt64Array::from(vec![1]);
480        let ts_init = UInt64Array::from(vec![2]);
481
482        let record_batch = RecordBatch::try_new(
483            TradeTick::get_schema(Some(metadata.clone())).into(),
484            vec![
485                Arc::new(price),
486                Arc::new(size),
487                Arc::new(aggressor_side),
488                Arc::new(trade_id),
489                Arc::new(ts_event),
490                Arc::new(ts_init),
491            ],
492        )
493        .unwrap();
494
495        let result = TradeTick::decode_batch(&metadata, record_batch);
496        assert!(result.is_err());
497        let err = result.unwrap_err();
498        assert!(
499            err.to_string().contains("price") && err.to_string().contains("row 0"),
500            "Expected price error at row 0, was: {err}"
501        );
502    }
503
504    #[rstest]
505    fn test_decode_batch_invalid_size_returns_error() {
506        use nautilus_model::types::quantity::QUANTITY_RAW_MAX;
507
508        let instrument_id = InstrumentId::from("AAPL.XNAS");
509        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
510
511        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
512        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
513
514        let invalid_size = QUANTITY_RAW_MAX + 1;
515        let size = FixedSizeBinaryArray::from(vec![&invalid_size.to_le_bytes()]);
516        let aggressor_side = UInt8Array::from(vec![0]);
517        let trade_id = StringArray::from(vec!["1"]);
518        let ts_event = UInt64Array::from(vec![1]);
519        let ts_init = UInt64Array::from(vec![2]);
520
521        let record_batch = RecordBatch::try_new(
522            TradeTick::get_schema(Some(metadata.clone())).into(),
523            vec![
524                Arc::new(price),
525                Arc::new(size),
526                Arc::new(aggressor_side),
527                Arc::new(trade_id),
528                Arc::new(ts_event),
529                Arc::new(ts_init),
530            ],
531        )
532        .unwrap();
533
534        let result = TradeTick::decode_batch(&metadata, record_batch);
535        assert!(result.is_err());
536        let err = result.unwrap_err();
537        assert!(
538            err.to_string().contains("size") && err.to_string().contains("row 0"),
539            "Expected size error at row 0, was: {err}"
540        );
541    }
542
543    #[rstest]
544    fn test_decode_batch_invalid_aggressor_side_returns_error() {
545        let instrument_id = InstrumentId::from("AAPL.XNAS");
546        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
547
548        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
549        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
550        let size = FixedSizeBinaryArray::from(vec![
551            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
552        ]);
553
554        let aggressor_side = UInt8Array::from(vec![99]);
555        let trade_id = StringArray::from(vec!["1"]);
556        let ts_event = UInt64Array::from(vec![1]);
557        let ts_init = UInt64Array::from(vec![2]);
558
559        let record_batch = RecordBatch::try_new(
560            TradeTick::get_schema(Some(metadata.clone())).into(),
561            vec![
562                Arc::new(price),
563                Arc::new(size),
564                Arc::new(aggressor_side),
565                Arc::new(trade_id),
566                Arc::new(ts_event),
567                Arc::new(ts_init),
568            ],
569        )
570        .unwrap();
571
572        let result = TradeTick::decode_batch(&metadata, record_batch);
573        assert!(result.is_err());
574        let err = result.unwrap_err();
575        assert!(
576            err.to_string().contains("AggressorSide"),
577            "Expected AggressorSide error, was: {err}"
578        );
579    }
580
581    #[rstest]
582    fn test_decode_batch_missing_instrument_id_returns_error() {
583        let instrument_id = InstrumentId::from("AAPL.XNAS");
584        let mut metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
585        metadata.remove(KEY_INSTRUMENT_ID);
586
587        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
588        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
589        let size = FixedSizeBinaryArray::from(vec![
590            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
591        ]);
592        let aggressor_side = UInt8Array::from(vec![0]);
593        let trade_id = StringArray::from(vec!["1"]);
594        let ts_event = UInt64Array::from(vec![1]);
595        let ts_init = UInt64Array::from(vec![2]);
596
597        let record_batch = RecordBatch::try_new(
598            TradeTick::get_schema(Some(metadata.clone())).into(),
599            vec![
600                Arc::new(price),
601                Arc::new(size),
602                Arc::new(aggressor_side),
603                Arc::new(trade_id),
604                Arc::new(ts_event),
605                Arc::new(ts_init),
606            ],
607        )
608        .unwrap();
609
610        let result = TradeTick::decode_batch(&metadata, record_batch);
611        assert!(result.is_err());
612        let err = result.unwrap_err();
613        assert!(
614            err.to_string().contains("instrument_id"),
615            "Expected missing instrument_id error, was: {err}"
616        );
617    }
618
619    #[rstest]
620    fn test_decode_batch_missing_price_precision_returns_error() {
621        let instrument_id = InstrumentId::from("AAPL.XNAS");
622        let mut metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
623        metadata.remove(KEY_PRICE_PRECISION);
624
625        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
626        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
627        let size = FixedSizeBinaryArray::from(vec![
628            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
629        ]);
630        let aggressor_side = UInt8Array::from(vec![0]);
631        let trade_id = StringArray::from(vec!["1"]);
632        let ts_event = UInt64Array::from(vec![1]);
633        let ts_init = UInt64Array::from(vec![2]);
634
635        let record_batch = RecordBatch::try_new(
636            TradeTick::get_schema(Some(metadata.clone())).into(),
637            vec![
638                Arc::new(price),
639                Arc::new(size),
640                Arc::new(aggressor_side),
641                Arc::new(trade_id),
642                Arc::new(ts_event),
643                Arc::new(ts_init),
644            ],
645        )
646        .unwrap();
647
648        let result = TradeTick::decode_batch(&metadata, record_batch);
649        assert!(result.is_err());
650        let err = result.unwrap_err();
651        assert!(
652            err.to_string().contains("price_precision"),
653            "Expected missing price_precision error, was: {err}"
654        );
655    }
656
657    #[rstest]
658    fn test_encode_decode_round_trip() {
659        let instrument_id = InstrumentId::from("AAPL.XNAS");
660        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
661
662        let tick1 = TradeTick {
663            instrument_id,
664            price: Price::from("100.10"),
665            size: Quantity::from(1000),
666            aggressor_side: AggressorSide::Buyer,
667            trade_id: TradeId::new("trade-123"),
668            ts_event: 1_000_000_000.into(),
669            ts_init: 1_000_000_001.into(),
670        };
671
672        let tick2 = TradeTick {
673            instrument_id,
674            price: Price::from("100.50"),
675            size: Quantity::from(500),
676            aggressor_side: AggressorSide::Seller,
677            trade_id: TradeId::new("trade-456"),
678            ts_event: 2_000_000_000.into(),
679            ts_init: 2_000_000_001.into(),
680        };
681
682        let original = vec![tick1, tick2];
683        let record_batch = TradeTick::encode_batch(&metadata, &original).unwrap();
684        let decoded = TradeTick::decode_batch(&metadata, record_batch).unwrap();
685
686        assert_eq!(decoded.len(), original.len());
687        for (orig, dec) in original.iter().zip(decoded.iter()) {
688            assert_eq!(dec.instrument_id, orig.instrument_id);
689            assert_eq!(dec.price, orig.price);
690            assert_eq!(dec.size, orig.size);
691            assert_eq!(dec.aggressor_side, orig.aggressor_side);
692            assert_eq!(dec.trade_id, orig.trade_id);
693            assert_eq!(dec.ts_event, orig.ts_event);
694            assert_eq!(dec.ts_init, orig.ts_init);
695        }
696    }
697}