1use std::{io::Read, path::Path};
19
20use ahash::AHashMap;
21use nautilus_core::UnixNanos;
22use nautilus_model::{
23 data::{delta::OrderBookDelta, order::BookOrder},
24 enums::{BookAction, OrderSide, RecordFlag},
25 identifiers::InstrumentId,
26 types::{Price, Quantity},
27};
28
29const PRICE_PRECISION: u8 = 4;
31
32const SIZE_PRECISION: u8 = 0;
34
35#[derive(Debug)]
36struct OrderState {
37 price: Price,
38 size: u32,
39 side: OrderSide,
40}
41
42#[derive(Debug)]
48pub struct ItchParser {
49 instrument_id: InstrumentId,
50 target_locate: Option<u16>,
51 target_stock: String,
52 base_ns: u64,
53 orders: AHashMap<u64, OrderState>,
54 sequence: u64,
55}
56
57impl ItchParser {
58 pub fn new(instrument_id: InstrumentId, stock: &str, base_ns: u64) -> Self {
67 Self {
68 instrument_id,
69 target_locate: None,
70 target_stock: stock.to_string(),
71 base_ns,
72 orders: AHashMap::new(),
73 sequence: 0,
74 }
75 }
76
77 pub fn parse_gzip_file(&mut self, path: &Path) -> anyhow::Result<Vec<OrderBookDelta>> {
84 let stream = itchy::MessageStream::from_gzip(path)
85 .map_err(|e| anyhow::anyhow!("Failed to open ITCH gzip: {e}"))?;
86 self.parse_stream(stream)
87 }
88
89 pub fn parse_reader<R: Read>(&mut self, reader: R) -> anyhow::Result<Vec<OrderBookDelta>> {
95 let stream = itchy::MessageStream::from_reader(reader);
96 self.parse_stream(stream)
97 }
98
99 fn parse_stream<R: Read>(
100 &mut self,
101 stream: itchy::MessageStream<R>,
102 ) -> anyhow::Result<Vec<OrderBookDelta>> {
103 let mut deltas = Vec::new();
104
105 for result in stream {
106 let msg = result.map_err(|e| anyhow::anyhow!("ITCH parse error: {e}"))?;
107
108 match msg.body {
110 itchy::Body::StockDirectory(ref dir) => {
111 let symbol = dir.stock.trim();
112 if symbol == self.target_stock {
113 self.target_locate = Some(msg.stock_locate);
114 }
115 continue;
116 }
117 itchy::Body::SystemEvent {
118 event: itchy::EventCode::EndOfMessages,
119 } => {
120 let ts = UnixNanos::from(self.base_ns + msg.timestamp);
121 self.handle_end_of_messages(ts, &mut deltas);
122 continue;
123 }
124 _ => {}
125 }
126
127 let Some(locate) = self.target_locate else {
129 continue;
130 };
131 if msg.stock_locate != locate {
132 continue;
133 }
134
135 let ts = UnixNanos::from(self.base_ns + msg.timestamp);
136
137 match msg.body {
138 itchy::Body::AddOrder(ref add) => {
139 self.handle_add_order(add, ts, &mut deltas);
140 }
141 itchy::Body::DeleteOrder { reference } => {
142 self.handle_delete_order(reference, ts, &mut deltas);
143 }
144 itchy::Body::OrderCancelled {
145 reference,
146 cancelled,
147 } => {
148 self.handle_cancel(reference, cancelled, ts, &mut deltas);
149 }
150 itchy::Body::OrderExecuted {
151 reference,
152 executed,
153 ..
154 } => {
155 self.handle_execution(reference, executed, ts, &mut deltas);
156 }
157 itchy::Body::OrderExecutedWithPrice {
158 reference,
159 executed,
160 ..
161 } => {
162 self.handle_execution(reference, executed, ts, &mut deltas);
163 }
164 itchy::Body::ReplaceOrder(ref replace) => {
165 self.handle_replace(replace, ts, &mut deltas);
166 }
167 _ => {}
168 }
169 }
170
171 if let Some(last) = deltas.last_mut() {
173 last.flags |= RecordFlag::F_LAST as u8;
174 }
175
176 Ok(deltas)
177 }
178
179 fn handle_add_order(
180 &mut self,
181 add: &itchy::AddOrder,
182 ts: UnixNanos,
183 deltas: &mut Vec<OrderBookDelta>,
184 ) {
185 let side = convert_side(add.side);
186 let price = convert_price(add.price);
187
188 self.orders.insert(
189 add.reference,
190 OrderState {
191 price,
192 size: add.shares,
193 side,
194 },
195 );
196
197 self.sequence += 1;
198 let order = BookOrder::new(
199 side,
200 price,
201 Quantity::new(f64::from(add.shares), SIZE_PRECISION),
202 add.reference,
203 );
204 deltas.push(OrderBookDelta::new(
205 self.instrument_id,
206 BookAction::Add,
207 order,
208 RecordFlag::F_LAST as u8,
209 self.sequence,
210 ts,
211 ts,
212 ));
213 }
214
215 fn handle_delete_order(
216 &mut self,
217 reference: u64,
218 ts: UnixNanos,
219 deltas: &mut Vec<OrderBookDelta>,
220 ) {
221 if let Some(state) = self.orders.remove(&reference) {
222 self.sequence += 1;
223 let order = BookOrder::new(
224 state.side,
225 state.price,
226 Quantity::new(0.0, SIZE_PRECISION),
227 reference,
228 );
229 deltas.push(OrderBookDelta::new(
230 self.instrument_id,
231 BookAction::Delete,
232 order,
233 RecordFlag::F_LAST as u8,
234 self.sequence,
235 ts,
236 ts,
237 ));
238 }
239 }
240
241 fn handle_cancel(
242 &mut self,
243 reference: u64,
244 cancelled: u32,
245 ts: UnixNanos,
246 deltas: &mut Vec<OrderBookDelta>,
247 ) {
248 if let Some(state) = self.orders.get_mut(&reference) {
249 state.size = state.size.saturating_sub(cancelled);
250
251 if state.size == 0 {
252 let state = self.orders.remove(&reference).unwrap();
254 self.sequence += 1;
255 let order = BookOrder::new(
256 state.side,
257 state.price,
258 Quantity::new(0.0, SIZE_PRECISION),
259 reference,
260 );
261 deltas.push(OrderBookDelta::new(
262 self.instrument_id,
263 BookAction::Delete,
264 order,
265 RecordFlag::F_LAST as u8,
266 self.sequence,
267 ts,
268 ts,
269 ));
270 } else {
271 self.sequence += 1;
273 let order = BookOrder::new(
274 state.side,
275 state.price,
276 Quantity::new(f64::from(state.size), SIZE_PRECISION),
277 reference,
278 );
279 deltas.push(OrderBookDelta::new(
280 self.instrument_id,
281 BookAction::Update,
282 order,
283 RecordFlag::F_LAST as u8,
284 self.sequence,
285 ts,
286 ts,
287 ));
288 }
289 }
290 }
291
292 fn handle_execution(
293 &mut self,
294 reference: u64,
295 executed: u32,
296 ts: UnixNanos,
297 deltas: &mut Vec<OrderBookDelta>,
298 ) {
299 if let Some(state) = self.orders.get_mut(&reference) {
300 state.size = state.size.saturating_sub(executed);
301
302 if state.size == 0 {
303 let state = self.orders.remove(&reference).unwrap();
305 self.sequence += 1;
306 let order = BookOrder::new(
307 state.side,
308 state.price,
309 Quantity::new(0.0, SIZE_PRECISION),
310 reference,
311 );
312 deltas.push(OrderBookDelta::new(
313 self.instrument_id,
314 BookAction::Delete,
315 order,
316 RecordFlag::F_LAST as u8,
317 self.sequence,
318 ts,
319 ts,
320 ));
321 } else {
322 self.sequence += 1;
324 let order = BookOrder::new(
325 state.side,
326 state.price,
327 Quantity::new(f64::from(state.size), SIZE_PRECISION),
328 reference,
329 );
330 deltas.push(OrderBookDelta::new(
331 self.instrument_id,
332 BookAction::Update,
333 order,
334 RecordFlag::F_LAST as u8,
335 self.sequence,
336 ts,
337 ts,
338 ));
339 }
340 }
341 }
342
343 fn handle_replace(
344 &mut self,
345 replace: &itchy::ReplaceOrder,
346 ts: UnixNanos,
347 deltas: &mut Vec<OrderBookDelta>,
348 ) {
349 if let Some(old_state) = self.orders.remove(&replace.old_reference) {
351 self.sequence += 1;
352 let old_order = BookOrder::new(
353 old_state.side,
354 old_state.price,
355 Quantity::new(0.0, SIZE_PRECISION),
356 replace.old_reference,
357 );
358 deltas.push(OrderBookDelta::new(
359 self.instrument_id,
360 BookAction::Delete,
361 old_order,
362 0, self.sequence,
364 ts,
365 ts,
366 ));
367
368 let new_price = convert_price(replace.price);
370 self.orders.insert(
371 replace.new_reference,
372 OrderState {
373 price: new_price,
374 size: replace.shares,
375 side: old_state.side,
376 },
377 );
378
379 self.sequence += 1;
380 let new_order = BookOrder::new(
381 old_state.side,
382 new_price,
383 Quantity::new(f64::from(replace.shares), SIZE_PRECISION),
384 replace.new_reference,
385 );
386 deltas.push(OrderBookDelta::new(
387 self.instrument_id,
388 BookAction::Add,
389 new_order,
390 RecordFlag::F_LAST as u8,
391 self.sequence,
392 ts,
393 ts,
394 ));
395 }
396 }
397
398 fn handle_end_of_messages(&mut self, ts: UnixNanos, deltas: &mut Vec<OrderBookDelta>) {
399 self.sequence += 1;
400 deltas.push(OrderBookDelta::clear(
401 self.instrument_id,
402 self.sequence,
403 ts,
404 ts,
405 ));
406 }
407}
408
409fn convert_side(side: itchy::Side) -> OrderSide {
410 match side {
411 itchy::Side::Buy => OrderSide::Buy,
412 itchy::Side::Sell => OrderSide::Sell,
413 }
414}
415
416fn convert_price(price: itchy::Price4) -> Price {
417 Price::new(f64::from(price.raw()) / 10_000.0, PRICE_PRECISION)
418}
419
420#[cfg(test)]
421mod tests {
422 use std::{fs, fs::File, path::PathBuf, sync::Arc};
423
424 use nautilus_model::data::OrderBookDelta;
425 use nautilus_serialization::arrow::{ArrowSchemaProvider, EncodeToRecordBatch};
426 use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
427 use rstest::rstest;
428
429 use super::*;
430
431 const AAPL_ID: &str = "AAPL.XNAS";
432
433 fn setup_parser(base_ns: u64) -> ItchParser {
434 ItchParser::new(InstrumentId::from(AAPL_ID), "AAPL", base_ns)
435 }
436
437 fn aapl_stream_with(messages: &[Vec<u8>]) -> Vec<u8> {
438 let mut buf = build_stock_directory_msg(1, b"AAPL ");
439 for msg in messages {
440 buf.extend_from_slice(msg);
441 }
442 buf
443 }
444
445 #[rstest]
446 fn test_convert_side() {
447 assert_eq!(convert_side(itchy::Side::Buy), OrderSide::Buy);
448 assert_eq!(convert_side(itchy::Side::Sell), OrderSide::Sell);
449 }
450
451 #[rstest]
452 fn test_convert_price() {
453 let price = convert_price(itchy::Price4::from(1_2345));
454 assert_eq!(price.as_f64(), 1.2345);
455 assert_eq!(price.precision, PRICE_PRECISION);
456 }
457
458 #[rstest]
459 fn test_convert_price_whole_dollar() {
460 let price = convert_price(itchy::Price4::from(100_0000));
461 assert_eq!(price.as_f64(), 100.0);
462 }
463
464 #[rstest]
465 fn test_convert_price_sub_penny() {
466 let price = convert_price(itchy::Price4::from(150_2501));
467 assert_eq!(price.as_f64(), 150.2501);
468 }
469
470 #[rstest]
471 fn test_add_order() {
472 let buf = aapl_stream_with(&[build_add_order_msg(1, 42, b'B', 100, 1_502_500)]);
473 let mut parser = setup_parser(0);
474 let deltas = parser.parse_reader(&buf[..]).unwrap();
475
476 assert_eq!(deltas.len(), 1);
477 assert_eq!(deltas[0].action, BookAction::Add);
478 assert_eq!(deltas[0].order.side, OrderSide::Buy);
479 assert_eq!(deltas[0].order.price.as_f64(), 150.25);
480 assert_eq!(deltas[0].order.size.as_f64(), 100.0);
481 assert_eq!(deltas[0].order.order_id, 42);
482 }
483
484 #[rstest]
485 fn test_delete_order() {
486 let buf = aapl_stream_with(&[
487 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
488 build_delete_order_msg(1, 42),
489 ]);
490 let mut parser = setup_parser(0);
491 let deltas = parser.parse_reader(&buf[..]).unwrap();
492
493 assert_eq!(deltas.len(), 2);
494 assert_eq!(deltas[1].action, BookAction::Delete);
495 assert_eq!(deltas[1].order.order_id, 42);
496 assert_eq!(deltas[1].order.size.as_f64(), 0.0);
497 }
498
499 #[rstest]
500 fn test_delete_unknown_order_is_ignored() {
501 let buf = aapl_stream_with(&[build_delete_order_msg(1, 999)]);
502 let mut parser = setup_parser(0);
503 let deltas = parser.parse_reader(&buf[..]).unwrap();
504
505 assert_eq!(deltas.len(), 0);
506 }
507
508 #[rstest]
509 fn test_partial_cancel_reduces_size() {
510 let buf = aapl_stream_with(&[
511 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
512 build_order_cancelled_msg(1, 42, 30),
513 ]);
514 let mut parser = setup_parser(0);
515 let deltas = parser.parse_reader(&buf[..]).unwrap();
516
517 assert_eq!(deltas.len(), 2);
518 assert_eq!(deltas[1].action, BookAction::Update);
519 assert_eq!(deltas[1].order.size.as_f64(), 70.0);
520 assert_eq!(deltas[1].order.price.as_f64(), 150.0);
521 assert_eq!(deltas[1].order.order_id, 42);
522 }
523
524 #[rstest]
525 fn test_full_cancel_deletes_order() {
526 let buf = aapl_stream_with(&[
527 build_add_order_msg(1, 42, b'S', 100, 1_500_000),
528 build_order_cancelled_msg(1, 42, 100),
529 ]);
530 let mut parser = setup_parser(0);
531 let deltas = parser.parse_reader(&buf[..]).unwrap();
532
533 assert_eq!(deltas.len(), 2);
534 assert_eq!(deltas[1].action, BookAction::Delete);
535 assert_eq!(deltas[1].order.size.as_f64(), 0.0);
536 }
537
538 #[rstest]
539 fn test_cancel_unknown_order_is_ignored() {
540 let buf = aapl_stream_with(&[build_order_cancelled_msg(1, 999, 50)]);
541 let mut parser = setup_parser(0);
542 let deltas = parser.parse_reader(&buf[..]).unwrap();
543
544 assert_eq!(deltas.len(), 0);
545 }
546
547 #[rstest]
548 fn test_partial_execution_updates_size() {
549 let buf = aapl_stream_with(&[
550 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
551 build_order_executed_msg(1, 42, 40, 1001),
552 ]);
553 let mut parser = setup_parser(0);
554 let deltas = parser.parse_reader(&buf[..]).unwrap();
555
556 assert_eq!(deltas.len(), 2);
557 assert_eq!(deltas[1].action, BookAction::Update);
558 assert_eq!(deltas[1].order.size.as_f64(), 60.0);
559 assert_eq!(deltas[1].order.order_id, 42);
560 }
561
562 #[rstest]
563 fn test_full_execution_deletes_order() {
564 let buf = aapl_stream_with(&[
565 build_add_order_msg(1, 42, b'S', 100, 1_500_000),
566 build_order_executed_msg(1, 42, 100, 1001),
567 ]);
568 let mut parser = setup_parser(0);
569 let deltas = parser.parse_reader(&buf[..]).unwrap();
570
571 assert_eq!(deltas.len(), 2);
572 assert_eq!(deltas[1].action, BookAction::Delete);
573 assert_eq!(deltas[1].order.size.as_f64(), 0.0);
574 }
575
576 #[rstest]
577 fn test_multiple_partial_executions_then_full() {
578 let buf = aapl_stream_with(&[
579 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
580 build_order_executed_msg(1, 42, 30, 1001),
581 build_order_executed_msg(1, 42, 30, 1002),
582 build_order_executed_msg(1, 42, 40, 1003),
583 ]);
584 let mut parser = setup_parser(0);
585 let deltas = parser.parse_reader(&buf[..]).unwrap();
586
587 assert_eq!(deltas.len(), 4);
588 assert_eq!(deltas[1].action, BookAction::Update);
589 assert_eq!(deltas[1].order.size.as_f64(), 70.0);
590 assert_eq!(deltas[2].action, BookAction::Update);
591 assert_eq!(deltas[2].order.size.as_f64(), 40.0);
592 assert_eq!(deltas[3].action, BookAction::Delete);
593 assert_eq!(deltas[3].order.size.as_f64(), 0.0);
594 }
595
596 #[rstest]
597 fn test_executed_with_price_partial() {
598 let buf = aapl_stream_with(&[
599 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
600 build_order_executed_with_price_msg(1, 42, 25, 2001, 1_505_000),
601 ]);
602 let mut parser = setup_parser(0);
603 let deltas = parser.parse_reader(&buf[..]).unwrap();
604
605 assert_eq!(deltas.len(), 2);
606 assert_eq!(deltas[1].action, BookAction::Update);
607 assert_eq!(deltas[1].order.size.as_f64(), 75.0);
608 assert_eq!(deltas[1].order.price.as_f64(), 150.0);
610 }
611
612 #[rstest]
613 fn test_execution_unknown_order_is_ignored() {
614 let buf = aapl_stream_with(&[build_order_executed_msg(1, 999, 50, 1001)]);
615 let mut parser = setup_parser(0);
616 let deltas = parser.parse_reader(&buf[..]).unwrap();
617
618 assert_eq!(deltas.len(), 0);
619 }
620
621 #[rstest]
622 fn test_replace_order() {
623 let buf = aapl_stream_with(&[
624 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
625 build_replace_order_msg(1, 42, 43, 150, 1_510_000),
626 ]);
627 let mut parser = setup_parser(0);
628 let deltas = parser.parse_reader(&buf[..]).unwrap();
629
630 assert_eq!(deltas.len(), 3);
631 assert_eq!(deltas[1].action, BookAction::Delete);
632 assert_eq!(deltas[1].order.order_id, 42);
633 assert_eq!(deltas[2].action, BookAction::Add);
634 assert_eq!(deltas[2].order.order_id, 43);
635 assert_eq!(deltas[2].order.price.as_f64(), 151.0);
636 assert_eq!(deltas[2].order.size.as_f64(), 150.0);
637 assert_eq!(deltas[2].order.side, OrderSide::Buy);
638 }
639
640 #[rstest]
641 fn test_replace_inherits_side_from_original() {
642 let buf = aapl_stream_with(&[
643 build_add_order_msg(1, 42, b'S', 100, 1_500_000),
644 build_replace_order_msg(1, 42, 43, 200, 1_490_000),
645 ]);
646 let mut parser = setup_parser(0);
647 let deltas = parser.parse_reader(&buf[..]).unwrap();
648
649 assert_eq!(deltas[2].order.side, OrderSide::Sell);
650 }
651
652 #[rstest]
653 fn test_replace_delete_has_no_f_last_flag() {
654 let buf = aapl_stream_with(&[
656 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
657 build_replace_order_msg(1, 42, 43, 100, 1_510_000),
658 ]);
659 let mut parser = setup_parser(0);
660 let deltas = parser.parse_reader(&buf[..]).unwrap();
661
662 assert_eq!(deltas[1].flags, 0);
663 assert_ne!(deltas[2].flags & RecordFlag::F_LAST as u8, 0);
664 }
665
666 #[rstest]
667 fn test_replace_unknown_order_is_ignored() {
668 let buf = aapl_stream_with(&[build_replace_order_msg(1, 999, 1000, 100, 1_500_000)]);
669 let mut parser = setup_parser(0);
670 let deltas = parser.parse_reader(&buf[..]).unwrap();
671
672 assert_eq!(deltas.len(), 0);
673 }
674
675 #[rstest]
676 fn test_end_of_messages_emits_clear() {
677 let buf = aapl_stream_with(&[
678 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
679 build_system_event_msg(0, b'C'),
681 ]);
682 let mut parser = setup_parser(0);
683 let deltas = parser.parse_reader(&buf[..]).unwrap();
684
685 assert_eq!(deltas.len(), 2);
686 assert_eq!(deltas[1].action, BookAction::Clear);
687 }
688
689 #[rstest]
690 fn test_end_of_messages_with_different_locate() {
691 let buf = aapl_stream_with(&[
693 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
694 build_system_event_msg(99, b'C'),
695 ]);
696 let mut parser = setup_parser(0);
697 let deltas = parser.parse_reader(&buf[..]).unwrap();
698
699 assert_eq!(deltas.len(), 2);
700 assert_eq!(deltas[1].action, BookAction::Clear);
701 }
702
703 #[rstest]
704 fn test_filters_by_stock_locate() {
705 let mut buf = build_stock_directory_msg(1, b"AAPL ");
706 buf.extend_from_slice(&build_stock_directory_msg(2, b"MSFT "));
707 buf.extend_from_slice(&build_add_order_msg(2, 10, b'B', 50, 3_000_000));
708 buf.extend_from_slice(&build_add_order_msg(1, 11, b'S', 200, 1_500_000));
709
710 let mut parser = setup_parser(0);
711 let deltas = parser.parse_reader(&buf[..]).unwrap();
712
713 assert_eq!(deltas.len(), 1);
714 assert_eq!(deltas[0].order.order_id, 11);
715 }
716
717 #[rstest]
718 fn test_messages_before_directory_are_ignored() {
719 let mut buf = Vec::new();
720 buf.extend_from_slice(&build_add_order_msg(1, 42, b'B', 100, 1_500_000));
722 buf.extend_from_slice(&build_stock_directory_msg(1, b"AAPL "));
723 buf.extend_from_slice(&build_add_order_msg(1, 43, b'B', 100, 1_500_000));
724
725 let mut parser = setup_parser(0);
726 let deltas = parser.parse_reader(&buf[..]).unwrap();
727
728 assert_eq!(deltas.len(), 1);
730 assert_eq!(deltas[0].order.order_id, 43);
731 }
732
733 #[rstest]
734 fn test_timestamp_offset_from_midnight() {
735 let base_ns: u64 = 1_548_806_400_000_000_000; let itch_ts: u64 = 34_200_000_000_000; let buf = aapl_stream_with(&[build_add_order_msg_with_ts(
738 1, 42, b'B', 100, 1_500_000, itch_ts,
739 )]);
740 let mut parser = setup_parser(base_ns);
741 let deltas = parser.parse_reader(&buf[..]).unwrap();
742
743 assert_eq!(deltas[0].ts_event, UnixNanos::from(base_ns + itch_ts));
744 assert_eq!(deltas[0].ts_init, deltas[0].ts_event);
745 }
746
747 #[rstest]
748 fn test_f_last_set_on_final_delta() {
749 let buf = aapl_stream_with(&[
750 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
751 build_add_order_msg(1, 43, b'S', 200, 1_510_000),
752 ]);
753 let mut parser = setup_parser(0);
754 let deltas = parser.parse_reader(&buf[..]).unwrap();
755
756 assert_eq!(deltas.len(), 2);
757 assert_ne!(deltas[1].flags & RecordFlag::F_LAST as u8, 0);
758 }
759
760 #[rstest]
761 fn test_sequence_numbers_are_monotonic() {
762 let buf = aapl_stream_with(&[
763 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
764 build_order_executed_msg(1, 42, 50, 1001),
765 build_add_order_msg(1, 43, b'S', 200, 1_510_000),
766 build_delete_order_msg(1, 43),
767 ]);
768 let mut parser = setup_parser(0);
769 let deltas = parser.parse_reader(&buf[..]).unwrap();
770
771 for i in 1..deltas.len() {
772 assert!(deltas[i].sequence > deltas[i - 1].sequence);
773 }
774 }
775
776 #[rstest]
777 fn test_empty_stream() {
778 let buf: &[u8] = &[];
779 let mut parser = setup_parser(0);
780 let deltas = parser.parse_reader(buf).unwrap();
781
782 assert_eq!(deltas.len(), 0);
783 }
784
785 fn build_msg(tag: u8, stock_locate: u16, timestamp: u64, body: &[u8]) -> Vec<u8> {
786 let msg_len = (1 + 2 + 2 + 6 + body.len()) as u16;
787 let mut buf = Vec::new();
788 buf.extend_from_slice(&msg_len.to_be_bytes());
789 buf.push(tag);
790 buf.extend_from_slice(&stock_locate.to_be_bytes());
791 buf.extend_from_slice(&0u16.to_be_bytes()); buf.push((timestamp >> 40) as u8);
794 buf.push((timestamp >> 32) as u8);
795 buf.push((timestamp >> 24) as u8);
796 buf.push((timestamp >> 16) as u8);
797 buf.push((timestamp >> 8) as u8);
798 buf.push(timestamp as u8);
799 buf.extend_from_slice(body);
800 buf
801 }
802
803 fn build_stock_directory_msg(locate: u16, stock: &[u8; 8]) -> Vec<u8> {
804 let mut body = Vec::new();
805 body.extend_from_slice(stock);
806 body.push(b'Q'); body.push(b'N'); body.extend_from_slice(&100u32.to_be_bytes()); body.push(b'Y'); body.push(b'C'); body.extend_from_slice(b"C "); body.push(b'P'); body.push(b'N'); body.push(b'N'); body.push(b'1'); body.push(b'N'); body.extend_from_slice(&0u32.to_be_bytes()); body.push(b'N'); build_msg(b'R', locate, 0, &body)
820 }
821
822 fn build_add_order_msg(
823 locate: u16,
824 reference: u64,
825 side: u8,
826 shares: u32,
827 price: u32,
828 ) -> Vec<u8> {
829 build_add_order_msg_with_ts(locate, reference, side, shares, price, 0)
830 }
831
832 fn build_add_order_msg_with_ts(
833 locate: u16,
834 reference: u64,
835 side: u8,
836 shares: u32,
837 price: u32,
838 timestamp: u64,
839 ) -> Vec<u8> {
840 let mut body = Vec::new();
841 body.extend_from_slice(&reference.to_be_bytes());
842 body.push(side);
843 body.extend_from_slice(&shares.to_be_bytes());
844 body.extend_from_slice(b"AAPL ");
845 body.extend_from_slice(&price.to_be_bytes());
846 build_msg(b'A', locate, timestamp, &body)
847 }
848
849 fn build_delete_order_msg(locate: u16, reference: u64) -> Vec<u8> {
850 build_msg(b'D', locate, 0, &reference.to_be_bytes())
851 }
852
853 fn build_order_cancelled_msg(locate: u16, reference: u64, cancelled: u32) -> Vec<u8> {
854 let mut body = Vec::new();
855 body.extend_from_slice(&reference.to_be_bytes());
856 body.extend_from_slice(&cancelled.to_be_bytes());
857 build_msg(b'X', locate, 0, &body)
858 }
859
860 fn build_order_executed_msg(
861 locate: u16,
862 reference: u64,
863 executed: u32,
864 match_number: u64,
865 ) -> Vec<u8> {
866 let mut body = Vec::new();
867 body.extend_from_slice(&reference.to_be_bytes());
868 body.extend_from_slice(&executed.to_be_bytes());
869 body.extend_from_slice(&match_number.to_be_bytes());
870 build_msg(b'E', locate, 0, &body)
871 }
872
873 fn build_order_executed_with_price_msg(
874 locate: u16,
875 reference: u64,
876 executed: u32,
877 match_number: u64,
878 price: u32,
879 ) -> Vec<u8> {
880 let mut body = Vec::new();
881 body.extend_from_slice(&reference.to_be_bytes());
882 body.extend_from_slice(&executed.to_be_bytes());
883 body.extend_from_slice(&match_number.to_be_bytes());
884 body.push(b'Y'); body.extend_from_slice(&price.to_be_bytes());
886 build_msg(b'C', locate, 0, &body)
887 }
888
889 fn build_replace_order_msg(
890 locate: u16,
891 old_reference: u64,
892 new_reference: u64,
893 shares: u32,
894 price: u32,
895 ) -> Vec<u8> {
896 let mut body = Vec::new();
897 body.extend_from_slice(&old_reference.to_be_bytes());
898 body.extend_from_slice(&new_reference.to_be_bytes());
899 body.extend_from_slice(&shares.to_be_bytes());
900 body.extend_from_slice(&price.to_be_bytes());
901 build_msg(b'U', locate, 0, &body)
902 }
903
904 fn build_system_event_msg(locate: u16, event_code: u8) -> Vec<u8> {
905 build_msg(b'S', locate, 0, &[event_code])
906 }
907
908 #[rstest]
912 #[ignore = "one-time dataset curation, not for routine CI"]
913 fn test_curate_aapl_itch() {
914 let itch_path = PathBuf::from("/tmp/01302019.NASDAQ_ITCH50.gz");
915 let instrument_id = InstrumentId::from("AAPL.XNAS");
916
917 let base_ns: u64 = 1_548_824_400_000_000_000;
919 let parquet_path = "/tmp/itch_AAPL.XNAS_2019-01-30_deltas.parquet";
920
921 println!("Parsing ITCH from {}", itch_path.display());
922 let mut parser = ItchParser::new(instrument_id, "AAPL", base_ns);
923 let deltas = parser.parse_gzip_file(&itch_path).unwrap();
924 let count = deltas.len();
925 println!("Parsed {count} deltas for AAPL");
926
927 let metadata =
928 OrderBookDelta::get_metadata(&instrument_id, PRICE_PRECISION, SIZE_PRECISION);
929 let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
930
931 println!("Writing Parquet to {parquet_path}");
932 let file = File::create(parquet_path).unwrap();
933 let zstd_level = parquet::basic::ZstdLevel::try_new(3).unwrap();
934 let props = WriterProperties::builder()
935 .set_compression(parquet::basic::Compression::ZSTD(zstd_level))
936 .set_max_row_group_size(1_000_000)
937 .build();
938 let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
939
940 let chunk_size = 1_000_000;
941 for (i, chunk) in deltas.chunks(chunk_size).enumerate() {
942 println!(" Encoding chunk {} ({} records)...", i + 1, chunk.len());
943 let batch = OrderBookDelta::encode_batch(&metadata, chunk).unwrap();
944 writer.write(&batch).unwrap();
945 }
946 writer.close().unwrap();
947
948 let file_size = fs::metadata(parquet_path).unwrap().len();
949 println!("\nRecords: {count}");
950 println!("Price precision: {PRICE_PRECISION}");
951 println!("Size precision: {SIZE_PRECISION}");
952 println!(
953 "File size: {} bytes ({:.1} MB)",
954 file_size,
955 file_size as f64 / 1_048_576.0
956 );
957 println!("Output: {parquet_path}");
958 println!("\nNext steps:");
959 println!(" sha256sum {parquet_path}");
960 }
961}