1use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19 array::{
20 FixedSizeBinaryArray, FixedSizeBinaryBuilder, StringArray, StringBuilder, StringViewArray,
21 UInt8Array, UInt64Array,
22 },
23 datatypes::{DataType, Field, Schema},
24 error::ArrowError,
25 record_batch::RecordBatch,
26};
27use nautilus_model::{
28 data::TradeTick,
29 enums::AggressorSide,
30 identifiers::{InstrumentId, TradeId},
31 types::fixed::PRECISION_BYTES,
32};
33
34use super::{
35 DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
36 KEY_SIZE_PRECISION, decode_price, decode_quantity, extract_column, validate_precision_bytes,
37};
38use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
39
40impl ArrowSchemaProvider for TradeTick {
41 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
42 let fields = vec![
43 Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
44 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
45 Field::new("aggressor_side", DataType::UInt8, false),
46 Field::new("trade_id", DataType::Utf8, false),
47 Field::new("ts_event", DataType::UInt64, false),
48 Field::new("ts_init", DataType::UInt64, false),
49 ];
50
51 match metadata {
52 Some(metadata) => Schema::new_with_metadata(fields, metadata),
53 None => Schema::new(fields),
54 }
55 }
56}
57
58fn parse_metadata(
59 metadata: &HashMap<String, String>,
60) -> Result<(InstrumentId, u8, u8), EncodingError> {
61 let instrument_id_str = metadata
62 .get(KEY_INSTRUMENT_ID)
63 .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
64 let instrument_id = InstrumentId::from_str(instrument_id_str)
65 .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
66
67 let price_precision = metadata
68 .get(KEY_PRICE_PRECISION)
69 .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
70 .parse::<u8>()
71 .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
72
73 let size_precision = metadata
74 .get(KEY_SIZE_PRECISION)
75 .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
76 .parse::<u8>()
77 .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
78
79 Ok((instrument_id, price_precision, size_precision))
80}
81
82impl EncodeToRecordBatch for TradeTick {
83 fn encode_batch(
84 metadata: &HashMap<String, String>,
85 data: &[Self],
86 ) -> Result<RecordBatch, ArrowError> {
87 let mut price_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
88 let mut size_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
89
90 let mut aggressor_side_builder = UInt8Array::builder(data.len());
91 let mut trade_id_builder = StringBuilder::new();
92 let mut ts_event_builder = UInt64Array::builder(data.len());
93 let mut ts_init_builder = UInt64Array::builder(data.len());
94
95 for tick in data {
96 price_builder
97 .append_value(tick.price.raw.to_le_bytes())
98 .unwrap();
99 size_builder
100 .append_value(tick.size.raw.to_le_bytes())
101 .unwrap();
102 aggressor_side_builder.append_value(tick.aggressor_side as u8);
103 trade_id_builder.append_value(tick.trade_id.to_string());
104 ts_event_builder.append_value(tick.ts_event.as_u64());
105 ts_init_builder.append_value(tick.ts_init.as_u64());
106 }
107
108 let price_array = Arc::new(price_builder.finish());
109 let size_array = Arc::new(size_builder.finish());
110 let aggressor_side_array = Arc::new(aggressor_side_builder.finish());
111 let trade_id_array = Arc::new(trade_id_builder.finish());
112 let ts_event_array = Arc::new(ts_event_builder.finish());
113 let ts_init_array = Arc::new(ts_init_builder.finish());
114
115 RecordBatch::try_new(
116 Self::get_schema(Some(metadata.clone())).into(),
117 vec![
118 price_array,
119 size_array,
120 aggressor_side_array,
121 trade_id_array,
122 ts_event_array,
123 ts_init_array,
124 ],
125 )
126 }
127
128 fn metadata(&self) -> HashMap<String, String> {
129 Self::get_metadata(
130 &self.instrument_id,
131 self.price.precision,
132 self.size.precision,
133 )
134 }
135}
136
137impl DecodeFromRecordBatch for TradeTick {
138 fn decode_batch(
139 metadata: &HashMap<String, String>,
140 record_batch: RecordBatch,
141 ) -> Result<Vec<Self>, EncodingError> {
142 let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
143 let cols = record_batch.columns();
144
145 let price_values = extract_column::<FixedSizeBinaryArray>(
146 cols,
147 "price",
148 0,
149 DataType::FixedSizeBinary(PRECISION_BYTES),
150 )?;
151
152 let size_values = extract_column::<FixedSizeBinaryArray>(
153 cols,
154 "size",
155 1,
156 DataType::FixedSizeBinary(PRECISION_BYTES),
157 )?;
158
159 validate_precision_bytes(price_values, "price")?;
160 validate_precision_bytes(size_values, "size")?;
161
162 let aggressor_side_values =
163 extract_column::<UInt8Array>(cols, "aggressor_side", 2, DataType::UInt8)?;
164 let ts_event_values = extract_column::<UInt64Array>(cols, "ts_event", 4, DataType::UInt64)?;
165 let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 5, DataType::UInt64)?;
166
167 let trade_id_values: Vec<TradeId> = if record_batch
169 .schema()
170 .field_with_name("trade_id")?
171 .data_type()
172 == &DataType::Utf8View
173 {
174 extract_column::<StringViewArray>(cols, "trade_id", 3, DataType::Utf8View)?
175 .iter()
176 .enumerate()
177 .map(|(i, id)| {
178 id.map(TradeId::from).ok_or_else(|| {
179 EncodingError::ParseError("trade_id", format!("NULL value at row {i}"))
180 })
181 })
182 .collect::<Result<Vec<_>, _>>()?
183 } else {
184 extract_column::<StringArray>(cols, "trade_id", 3, DataType::Utf8)?
185 .iter()
186 .enumerate()
187 .map(|(i, id)| {
188 id.map(TradeId::from).ok_or_else(|| {
189 EncodingError::ParseError("trade_id", format!("NULL value at row {i}"))
190 })
191 })
192 .collect::<Result<Vec<_>, _>>()?
193 };
194
195 let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
196 .map(|i| {
197 let price = decode_price(price_values.value(i), price_precision, "price", i)?;
198 let size = decode_quantity(size_values.value(i), size_precision, "size", i)?;
199 let aggressor_side_value = aggressor_side_values.value(i);
200 let aggressor_side = AggressorSide::from_repr(aggressor_side_value as usize)
201 .ok_or_else(|| {
202 EncodingError::ParseError(
203 stringify!(AggressorSide),
204 format!("Invalid enum value, was {aggressor_side_value}"),
205 )
206 })?;
207 let trade_id = trade_id_values[i];
208 let ts_event = ts_event_values.value(i).into();
209 let ts_init = ts_init_values.value(i).into();
210
211 Ok(Self {
212 instrument_id,
213 price,
214 size,
215 aggressor_side,
216 trade_id,
217 ts_event,
218 ts_init,
219 })
220 })
221 .collect();
222
223 result
224 }
225}
226
227impl DecodeDataFromRecordBatch for TradeTick {
228 fn decode_data_batch(
229 metadata: &HashMap<String, String>,
230 record_batch: RecordBatch,
231 ) -> Result<Vec<Data>, EncodingError> {
232 let ticks: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
233 Ok(ticks.into_iter().map(Data::from).collect())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use std::sync::Arc;
240
241 use arrow::{
242 array::{Array, FixedSizeBinaryArray, UInt8Array, UInt64Array},
243 record_batch::RecordBatch,
244 };
245 use nautilus_model::types::{
246 Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw,
247 };
248 use rstest::rstest;
249
250 use super::*;
251 use crate::arrow::{get_raw_price, get_raw_quantity};
252
253 #[rstest]
254 fn test_get_schema() {
255 let instrument_id = InstrumentId::from("AAPL.XNAS");
256 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
257 let schema = TradeTick::get_schema(Some(metadata.clone()));
258
259 let mut expected_fields = Vec::with_capacity(6);
260
261 expected_fields.push(Field::new(
262 "price",
263 DataType::FixedSizeBinary(PRECISION_BYTES),
264 false,
265 ));
266
267 expected_fields.extend(vec![
268 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
269 Field::new("aggressor_side", DataType::UInt8, false),
270 Field::new("trade_id", DataType::Utf8, false),
271 Field::new("ts_event", DataType::UInt64, false),
272 Field::new("ts_init", DataType::UInt64, false),
273 ]);
274
275 let expected_schema = Schema::new_with_metadata(expected_fields, metadata);
276 assert_eq!(schema, expected_schema);
277 }
278
279 #[rstest]
280 fn test_get_schema_map() {
281 let schema_map = TradeTick::get_schema_map();
282 let mut expected_map = HashMap::new();
283
284 let precision_bytes = format!("FixedSizeBinary({PRECISION_BYTES})");
285 expected_map.insert("price".to_string(), precision_bytes.clone());
286 expected_map.insert("size".to_string(), precision_bytes);
287 expected_map.insert("aggressor_side".to_string(), "UInt8".to_string());
288 expected_map.insert("trade_id".to_string(), "Utf8".to_string());
289 expected_map.insert("ts_event".to_string(), "UInt64".to_string());
290 expected_map.insert("ts_init".to_string(), "UInt64".to_string());
291 assert_eq!(schema_map, expected_map);
292 }
293
294 #[rstest]
295 fn test_encode_trade_tick() {
296 let instrument_id = InstrumentId::from("AAPL.XNAS");
297 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
298
299 let tick1 = TradeTick {
300 instrument_id,
301 price: Price::from("100.10"),
302 size: Quantity::from(1000),
303 aggressor_side: AggressorSide::Buyer,
304 trade_id: TradeId::new("1"),
305 ts_event: 1.into(),
306 ts_init: 3.into(),
307 };
308
309 let tick2 = TradeTick {
310 instrument_id,
311 price: Price::from("100.50"),
312 size: Quantity::from(500),
313 aggressor_side: AggressorSide::Seller,
314 trade_id: TradeId::new("2"),
315 ts_event: 2.into(),
316 ts_init: 4.into(),
317 };
318
319 let data = vec![tick1, tick2];
320 let record_batch = TradeTick::encode_batch(&metadata, &data).unwrap();
321 let columns = record_batch.columns();
322
323 let price_values = columns[0]
324 .as_any()
325 .downcast_ref::<FixedSizeBinaryArray>()
326 .unwrap();
327 assert_eq!(
328 get_raw_price(price_values.value(0)),
329 (100.10 * FIXED_SCALAR) as PriceRaw
330 );
331 assert_eq!(
332 get_raw_price(price_values.value(1)),
333 (100.50 * FIXED_SCALAR) as PriceRaw
334 );
335
336 let size_values = columns[1]
337 .as_any()
338 .downcast_ref::<FixedSizeBinaryArray>()
339 .unwrap();
340 assert_eq!(
341 get_raw_quantity(size_values.value(0)),
342 (1000.0 * FIXED_SCALAR) as QuantityRaw
343 );
344 assert_eq!(
345 get_raw_quantity(size_values.value(1)),
346 (500.0 * FIXED_SCALAR) as QuantityRaw
347 );
348
349 let aggressor_side_values = columns[2].as_any().downcast_ref::<UInt8Array>().unwrap();
350 let trade_id_values = columns[3].as_any().downcast_ref::<StringArray>().unwrap();
351 let ts_event_values = columns[4].as_any().downcast_ref::<UInt64Array>().unwrap();
352 let ts_init_values = columns[5].as_any().downcast_ref::<UInt64Array>().unwrap();
353
354 assert_eq!(columns.len(), 6);
355 assert_eq!(size_values.len(), 2);
356 assert_eq!(
357 get_raw_quantity(size_values.value(0)),
358 (1000.0 * FIXED_SCALAR) as QuantityRaw
359 );
360 assert_eq!(
361 get_raw_quantity(size_values.value(1)),
362 (500.0 * FIXED_SCALAR) as QuantityRaw
363 );
364 assert_eq!(aggressor_side_values.len(), 2);
365 assert_eq!(aggressor_side_values.value(0), 1);
366 assert_eq!(aggressor_side_values.value(1), 2);
367 assert_eq!(trade_id_values.len(), 2);
368 assert_eq!(trade_id_values.value(0), "1");
369 assert_eq!(trade_id_values.value(1), "2");
370 assert_eq!(ts_event_values.len(), 2);
371 assert_eq!(ts_event_values.value(0), 1);
372 assert_eq!(ts_event_values.value(1), 2);
373 assert_eq!(ts_init_values.len(), 2);
374 assert_eq!(ts_init_values.value(0), 3);
375 assert_eq!(ts_init_values.value(1), 4);
376 }
377
378 #[rstest]
379 fn test_decode_batch() {
380 let instrument_id = InstrumentId::from("AAPL.XNAS");
381 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
382
383 let raw_price1 = (100.00 * FIXED_SCALAR) as PriceRaw;
384 let raw_price2 = (101.00 * FIXED_SCALAR) as PriceRaw;
385 let price =
386 FixedSizeBinaryArray::from(vec![&raw_price1.to_le_bytes(), &raw_price2.to_le_bytes()]);
387
388 let size = FixedSizeBinaryArray::from(vec![
389 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
390 &((900.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
391 ]);
392 let aggressor_side = UInt8Array::from(vec![0, 1]); let trade_id = StringArray::from(vec!["1", "2"]);
394 let ts_event = UInt64Array::from(vec![1, 2]);
395 let ts_init = UInt64Array::from(vec![3, 4]);
396
397 let record_batch = RecordBatch::try_new(
398 TradeTick::get_schema(Some(metadata.clone())).into(),
399 vec![
400 Arc::new(price),
401 Arc::new(size),
402 Arc::new(aggressor_side),
403 Arc::new(trade_id),
404 Arc::new(ts_event),
405 Arc::new(ts_init),
406 ],
407 )
408 .unwrap();
409
410 let decoded_data = TradeTick::decode_batch(&metadata, record_batch).unwrap();
411 assert_eq!(decoded_data.len(), 2);
412 assert_eq!(decoded_data[0].price, Price::from_raw(raw_price1, 2));
413 assert_eq!(decoded_data[1].price, Price::from_raw(raw_price2, 2));
414 }
415
416 #[rstest]
417 fn test_decode_batch_null_trade_id_returns_error() {
418 use arrow::datatypes::Field;
419
420 let instrument_id = InstrumentId::from("AAPL.XNAS");
421 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
422
423 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
424 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
425 let size = FixedSizeBinaryArray::from(vec![
426 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
427 ]);
428 let aggressor_side = UInt8Array::from(vec![0]);
429
430 let trade_id: StringArray = vec![None::<&str>].into();
431 let ts_event = UInt64Array::from(vec![1]);
432 let ts_init = UInt64Array::from(vec![2]);
433
434 let fields = vec![
436 Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
437 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
438 Field::new("aggressor_side", DataType::UInt8, false),
439 Field::new("trade_id", DataType::Utf8, true), Field::new("ts_event", DataType::UInt64, false),
441 Field::new("ts_init", DataType::UInt64, false),
442 ];
443 let schema = Schema::new_with_metadata(fields, metadata.clone());
444
445 let record_batch = RecordBatch::try_new(
446 schema.into(),
447 vec![
448 Arc::new(price),
449 Arc::new(size),
450 Arc::new(aggressor_side),
451 Arc::new(trade_id),
452 Arc::new(ts_event),
453 Arc::new(ts_init),
454 ],
455 )
456 .unwrap();
457
458 let result = TradeTick::decode_batch(&metadata, record_batch);
459 assert!(result.is_err());
460 let err = result.unwrap_err();
461 assert!(
462 err.to_string().contains("NULL value at row 0"),
463 "Expected NULL error, was: {err}"
464 );
465 }
466
467 #[rstest]
468 fn test_decode_batch_invalid_price_returns_error() {
469 let instrument_id = InstrumentId::from("AAPL.XNAS");
470 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
471
472 let invalid_price: PriceRaw = PriceRaw::MAX - 1000;
473 let price = FixedSizeBinaryArray::from(vec![&invalid_price.to_le_bytes()]);
474 let size = FixedSizeBinaryArray::from(vec![
475 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
476 ]);
477 let aggressor_side = UInt8Array::from(vec![0]);
478 let trade_id = StringArray::from(vec!["1"]);
479 let ts_event = UInt64Array::from(vec![1]);
480 let ts_init = UInt64Array::from(vec![2]);
481
482 let record_batch = RecordBatch::try_new(
483 TradeTick::get_schema(Some(metadata.clone())).into(),
484 vec![
485 Arc::new(price),
486 Arc::new(size),
487 Arc::new(aggressor_side),
488 Arc::new(trade_id),
489 Arc::new(ts_event),
490 Arc::new(ts_init),
491 ],
492 )
493 .unwrap();
494
495 let result = TradeTick::decode_batch(&metadata, record_batch);
496 assert!(result.is_err());
497 let err = result.unwrap_err();
498 assert!(
499 err.to_string().contains("price") && err.to_string().contains("row 0"),
500 "Expected price error at row 0, was: {err}"
501 );
502 }
503
504 #[rstest]
505 fn test_decode_batch_invalid_size_returns_error() {
506 use nautilus_model::types::quantity::QUANTITY_RAW_MAX;
507
508 let instrument_id = InstrumentId::from("AAPL.XNAS");
509 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
510
511 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
512 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
513
514 let invalid_size = QUANTITY_RAW_MAX + 1;
515 let size = FixedSizeBinaryArray::from(vec![&invalid_size.to_le_bytes()]);
516 let aggressor_side = UInt8Array::from(vec![0]);
517 let trade_id = StringArray::from(vec!["1"]);
518 let ts_event = UInt64Array::from(vec![1]);
519 let ts_init = UInt64Array::from(vec![2]);
520
521 let record_batch = RecordBatch::try_new(
522 TradeTick::get_schema(Some(metadata.clone())).into(),
523 vec![
524 Arc::new(price),
525 Arc::new(size),
526 Arc::new(aggressor_side),
527 Arc::new(trade_id),
528 Arc::new(ts_event),
529 Arc::new(ts_init),
530 ],
531 )
532 .unwrap();
533
534 let result = TradeTick::decode_batch(&metadata, record_batch);
535 assert!(result.is_err());
536 let err = result.unwrap_err();
537 assert!(
538 err.to_string().contains("size") && err.to_string().contains("row 0"),
539 "Expected size error at row 0, was: {err}"
540 );
541 }
542
543 #[rstest]
544 fn test_decode_batch_invalid_aggressor_side_returns_error() {
545 let instrument_id = InstrumentId::from("AAPL.XNAS");
546 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
547
548 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
549 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
550 let size = FixedSizeBinaryArray::from(vec![
551 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
552 ]);
553
554 let aggressor_side = UInt8Array::from(vec![99]);
555 let trade_id = StringArray::from(vec!["1"]);
556 let ts_event = UInt64Array::from(vec![1]);
557 let ts_init = UInt64Array::from(vec![2]);
558
559 let record_batch = RecordBatch::try_new(
560 TradeTick::get_schema(Some(metadata.clone())).into(),
561 vec![
562 Arc::new(price),
563 Arc::new(size),
564 Arc::new(aggressor_side),
565 Arc::new(trade_id),
566 Arc::new(ts_event),
567 Arc::new(ts_init),
568 ],
569 )
570 .unwrap();
571
572 let result = TradeTick::decode_batch(&metadata, record_batch);
573 assert!(result.is_err());
574 let err = result.unwrap_err();
575 assert!(
576 err.to_string().contains("AggressorSide"),
577 "Expected AggressorSide error, was: {err}"
578 );
579 }
580
581 #[rstest]
582 fn test_decode_batch_missing_instrument_id_returns_error() {
583 let instrument_id = InstrumentId::from("AAPL.XNAS");
584 let mut metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
585 metadata.remove(KEY_INSTRUMENT_ID);
586
587 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
588 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
589 let size = FixedSizeBinaryArray::from(vec![
590 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
591 ]);
592 let aggressor_side = UInt8Array::from(vec![0]);
593 let trade_id = StringArray::from(vec!["1"]);
594 let ts_event = UInt64Array::from(vec![1]);
595 let ts_init = UInt64Array::from(vec![2]);
596
597 let record_batch = RecordBatch::try_new(
598 TradeTick::get_schema(Some(metadata.clone())).into(),
599 vec![
600 Arc::new(price),
601 Arc::new(size),
602 Arc::new(aggressor_side),
603 Arc::new(trade_id),
604 Arc::new(ts_event),
605 Arc::new(ts_init),
606 ],
607 )
608 .unwrap();
609
610 let result = TradeTick::decode_batch(&metadata, record_batch);
611 assert!(result.is_err());
612 let err = result.unwrap_err();
613 assert!(
614 err.to_string().contains("instrument_id"),
615 "Expected missing instrument_id error, was: {err}"
616 );
617 }
618
619 #[rstest]
620 fn test_decode_batch_missing_price_precision_returns_error() {
621 let instrument_id = InstrumentId::from("AAPL.XNAS");
622 let mut metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
623 metadata.remove(KEY_PRICE_PRECISION);
624
625 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
626 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
627 let size = FixedSizeBinaryArray::from(vec![
628 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
629 ]);
630 let aggressor_side = UInt8Array::from(vec![0]);
631 let trade_id = StringArray::from(vec!["1"]);
632 let ts_event = UInt64Array::from(vec![1]);
633 let ts_init = UInt64Array::from(vec![2]);
634
635 let record_batch = RecordBatch::try_new(
636 TradeTick::get_schema(Some(metadata.clone())).into(),
637 vec![
638 Arc::new(price),
639 Arc::new(size),
640 Arc::new(aggressor_side),
641 Arc::new(trade_id),
642 Arc::new(ts_event),
643 Arc::new(ts_init),
644 ],
645 )
646 .unwrap();
647
648 let result = TradeTick::decode_batch(&metadata, record_batch);
649 assert!(result.is_err());
650 let err = result.unwrap_err();
651 assert!(
652 err.to_string().contains("price_precision"),
653 "Expected missing price_precision error, was: {err}"
654 );
655 }
656
657 #[rstest]
658 fn test_encode_decode_round_trip() {
659 let instrument_id = InstrumentId::from("AAPL.XNAS");
660 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
661
662 let tick1 = TradeTick {
663 instrument_id,
664 price: Price::from("100.10"),
665 size: Quantity::from(1000),
666 aggressor_side: AggressorSide::Buyer,
667 trade_id: TradeId::new("trade-123"),
668 ts_event: 1_000_000_000.into(),
669 ts_init: 1_000_000_001.into(),
670 };
671
672 let tick2 = TradeTick {
673 instrument_id,
674 price: Price::from("100.50"),
675 size: Quantity::from(500),
676 aggressor_side: AggressorSide::Seller,
677 trade_id: TradeId::new("trade-456"),
678 ts_event: 2_000_000_000.into(),
679 ts_init: 2_000_000_001.into(),
680 };
681
682 let original = vec![tick1, tick2];
683 let record_batch = TradeTick::encode_batch(&metadata, &original).unwrap();
684 let decoded = TradeTick::decode_batch(&metadata, record_batch).unwrap();
685
686 assert_eq!(decoded.len(), original.len());
687 for (orig, dec) in original.iter().zip(decoded.iter()) {
688 assert_eq!(dec.instrument_id, orig.instrument_id);
689 assert_eq!(dec.price, orig.price);
690 assert_eq!(dec.size, orig.size);
691 assert_eq!(dec.aggressor_side, orig.aggressor_side);
692 assert_eq!(dec.trade_id, orig.trade_id);
693 assert_eq!(dec.ts_event, orig.ts_event);
694 assert_eq!(dec.ts_init, orig.ts_init);
695 }
696 }
697}