Skip to main content

nautilus_testkit/itch/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! ITCH 5.0 message to [`OrderBookDelta`] conversion.
17
18use std::{io::Read, path::Path};
19
20use ahash::AHashMap;
21use nautilus_core::UnixNanos;
22use nautilus_model::{
23    data::{delta::OrderBookDelta, order::BookOrder},
24    enums::{BookAction, OrderSide, RecordFlag},
25    identifiers::InstrumentId,
26    types::{Price, Quantity},
27};
28
29/// Price precision for US equities (4 decimal places, $0.0001 increments).
30const PRICE_PRECISION: u8 = 4;
31
32/// Size precision for US equities (whole shares).
33const SIZE_PRECISION: u8 = 0;
34
35#[derive(Debug)]
36struct OrderState {
37    price: Price,
38    size: u32,
39    side: OrderSide,
40}
41
42/// Converts a stream of ITCH 5.0 messages into [`OrderBookDelta`] events
43/// for a single instrument.
44///
45/// Maintains internal order state to compute remaining sizes after partial
46/// executions and cancellations.
47#[derive(Debug)]
48pub struct ItchParser {
49    instrument_id: InstrumentId,
50    target_locate: Option<u16>,
51    target_stock: String,
52    base_ns: u64,
53    orders: AHashMap<u64, OrderState>,
54    sequence: u64,
55}
56
57impl ItchParser {
58    /// Creates a new [`ItchParser`] for the given instrument.
59    ///
60    /// # Arguments
61    ///
62    /// - `instrument_id` - The NautilusTrader instrument ID for output deltas.
63    /// - `stock` - The ITCH stock symbol to filter for (e.g., "AAPL").
64    /// - `base_ns` - Base UNIX nanoseconds for midnight of the trading day
65    ///   (ITCH timestamps are nanoseconds since midnight).
66    pub fn new(instrument_id: InstrumentId, stock: &str, base_ns: u64) -> Self {
67        Self {
68            instrument_id,
69            target_locate: None,
70            target_stock: stock.to_string(),
71            base_ns,
72            orders: AHashMap::new(),
73            sequence: 0,
74        }
75    }
76
77    /// Parses all ITCH messages from a gzip-compressed file and returns
78    /// the filtered [`OrderBookDelta`] events.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the file cannot be opened or contains invalid data.
83    pub fn parse_gzip_file(&mut self, path: &Path) -> anyhow::Result<Vec<OrderBookDelta>> {
84        let stream = itchy::MessageStream::from_gzip(path)
85            .map_err(|e| anyhow::anyhow!("Failed to open ITCH gzip: {e}"))?;
86        self.parse_stream(stream)
87    }
88
89    /// Parses all ITCH messages from a reader and returns filtered deltas.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the stream contains invalid data.
94    pub fn parse_reader<R: Read>(&mut self, reader: R) -> anyhow::Result<Vec<OrderBookDelta>> {
95        let stream = itchy::MessageStream::from_reader(reader);
96        self.parse_stream(stream)
97    }
98
99    fn parse_stream<R: Read>(
100        &mut self,
101        stream: itchy::MessageStream<R>,
102    ) -> anyhow::Result<Vec<OrderBookDelta>> {
103        let mut deltas = Vec::new();
104
105        for result in stream {
106            let msg = result.map_err(|e| anyhow::anyhow!("ITCH parse error: {e}"))?;
107
108            // Handle feed-level messages before stock locate filtering
109            match msg.body {
110                itchy::Body::StockDirectory(ref dir) => {
111                    let symbol = dir.stock.trim();
112                    if symbol == self.target_stock {
113                        self.target_locate = Some(msg.stock_locate);
114                    }
115                    continue;
116                }
117                itchy::Body::SystemEvent {
118                    event: itchy::EventCode::EndOfMessages,
119                } => {
120                    let ts = UnixNanos::from(self.base_ns + msg.timestamp);
121                    self.handle_end_of_messages(ts, &mut deltas);
122                    continue;
123                }
124                _ => {}
125            }
126
127            // Filter by target stock locate
128            let Some(locate) = self.target_locate else {
129                continue;
130            };
131            if msg.stock_locate != locate {
132                continue;
133            }
134
135            let ts = UnixNanos::from(self.base_ns + msg.timestamp);
136
137            match msg.body {
138                itchy::Body::AddOrder(ref add) => {
139                    self.handle_add_order(add, ts, &mut deltas);
140                }
141                itchy::Body::DeleteOrder { reference } => {
142                    self.handle_delete_order(reference, ts, &mut deltas);
143                }
144                itchy::Body::OrderCancelled {
145                    reference,
146                    cancelled,
147                } => {
148                    self.handle_cancel(reference, cancelled, ts, &mut deltas);
149                }
150                itchy::Body::OrderExecuted {
151                    reference,
152                    executed,
153                    ..
154                } => {
155                    self.handle_execution(reference, executed, ts, &mut deltas);
156                }
157                itchy::Body::OrderExecutedWithPrice {
158                    reference,
159                    executed,
160                    ..
161                } => {
162                    self.handle_execution(reference, executed, ts, &mut deltas);
163                }
164                itchy::Body::ReplaceOrder(ref replace) => {
165                    self.handle_replace(replace, ts, &mut deltas);
166                }
167                _ => {}
168            }
169        }
170
171        // Set F_LAST on the final delta
172        if let Some(last) = deltas.last_mut() {
173            last.flags |= RecordFlag::F_LAST as u8;
174        }
175
176        Ok(deltas)
177    }
178
179    fn handle_add_order(
180        &mut self,
181        add: &itchy::AddOrder,
182        ts: UnixNanos,
183        deltas: &mut Vec<OrderBookDelta>,
184    ) {
185        let side = convert_side(add.side);
186        let price = convert_price(add.price);
187
188        self.orders.insert(
189            add.reference,
190            OrderState {
191                price,
192                size: add.shares,
193                side,
194            },
195        );
196
197        self.sequence += 1;
198        let order = BookOrder::new(
199            side,
200            price,
201            Quantity::new(f64::from(add.shares), SIZE_PRECISION),
202            add.reference,
203        );
204        deltas.push(OrderBookDelta::new(
205            self.instrument_id,
206            BookAction::Add,
207            order,
208            RecordFlag::F_LAST as u8,
209            self.sequence,
210            ts,
211            ts,
212        ));
213    }
214
215    fn handle_delete_order(
216        &mut self,
217        reference: u64,
218        ts: UnixNanos,
219        deltas: &mut Vec<OrderBookDelta>,
220    ) {
221        if let Some(state) = self.orders.remove(&reference) {
222            self.sequence += 1;
223            let order = BookOrder::new(
224                state.side,
225                state.price,
226                Quantity::new(0.0, SIZE_PRECISION),
227                reference,
228            );
229            deltas.push(OrderBookDelta::new(
230                self.instrument_id,
231                BookAction::Delete,
232                order,
233                RecordFlag::F_LAST as u8,
234                self.sequence,
235                ts,
236                ts,
237            ));
238        }
239    }
240
241    fn handle_cancel(
242        &mut self,
243        reference: u64,
244        cancelled: u32,
245        ts: UnixNanos,
246        deltas: &mut Vec<OrderBookDelta>,
247    ) {
248        if let Some(state) = self.orders.get_mut(&reference) {
249            state.size = state.size.saturating_sub(cancelled);
250
251            if state.size == 0 {
252                // Full cancel
253                let state = self.orders.remove(&reference).unwrap();
254                self.sequence += 1;
255                let order = BookOrder::new(
256                    state.side,
257                    state.price,
258                    Quantity::new(0.0, SIZE_PRECISION),
259                    reference,
260                );
261                deltas.push(OrderBookDelta::new(
262                    self.instrument_id,
263                    BookAction::Delete,
264                    order,
265                    RecordFlag::F_LAST as u8,
266                    self.sequence,
267                    ts,
268                    ts,
269                ));
270            } else {
271                // Partial cancel
272                self.sequence += 1;
273                let order = BookOrder::new(
274                    state.side,
275                    state.price,
276                    Quantity::new(f64::from(state.size), SIZE_PRECISION),
277                    reference,
278                );
279                deltas.push(OrderBookDelta::new(
280                    self.instrument_id,
281                    BookAction::Update,
282                    order,
283                    RecordFlag::F_LAST as u8,
284                    self.sequence,
285                    ts,
286                    ts,
287                ));
288            }
289        }
290    }
291
292    fn handle_execution(
293        &mut self,
294        reference: u64,
295        executed: u32,
296        ts: UnixNanos,
297        deltas: &mut Vec<OrderBookDelta>,
298    ) {
299        if let Some(state) = self.orders.get_mut(&reference) {
300            state.size = state.size.saturating_sub(executed);
301
302            if state.size == 0 {
303                // Fully consumed
304                let state = self.orders.remove(&reference).unwrap();
305                self.sequence += 1;
306                let order = BookOrder::new(
307                    state.side,
308                    state.price,
309                    Quantity::new(0.0, SIZE_PRECISION),
310                    reference,
311                );
312                deltas.push(OrderBookDelta::new(
313                    self.instrument_id,
314                    BookAction::Delete,
315                    order,
316                    RecordFlag::F_LAST as u8,
317                    self.sequence,
318                    ts,
319                    ts,
320                ));
321            } else {
322                // Partial execution
323                self.sequence += 1;
324                let order = BookOrder::new(
325                    state.side,
326                    state.price,
327                    Quantity::new(f64::from(state.size), SIZE_PRECISION),
328                    reference,
329                );
330                deltas.push(OrderBookDelta::new(
331                    self.instrument_id,
332                    BookAction::Update,
333                    order,
334                    RecordFlag::F_LAST as u8,
335                    self.sequence,
336                    ts,
337                    ts,
338                ));
339            }
340        }
341    }
342
343    fn handle_replace(
344        &mut self,
345        replace: &itchy::ReplaceOrder,
346        ts: UnixNanos,
347        deltas: &mut Vec<OrderBookDelta>,
348    ) {
349        // Delete old order
350        if let Some(old_state) = self.orders.remove(&replace.old_reference) {
351            self.sequence += 1;
352            let old_order = BookOrder::new(
353                old_state.side,
354                old_state.price,
355                Quantity::new(0.0, SIZE_PRECISION),
356                replace.old_reference,
357            );
358            deltas.push(OrderBookDelta::new(
359                self.instrument_id,
360                BookAction::Delete,
361                old_order,
362                0, // Not the last in this event group
363                self.sequence,
364                ts,
365                ts,
366            ));
367
368            // Add new order (inherits side from old order)
369            let new_price = convert_price(replace.price);
370            self.orders.insert(
371                replace.new_reference,
372                OrderState {
373                    price: new_price,
374                    size: replace.shares,
375                    side: old_state.side,
376                },
377            );
378
379            self.sequence += 1;
380            let new_order = BookOrder::new(
381                old_state.side,
382                new_price,
383                Quantity::new(f64::from(replace.shares), SIZE_PRECISION),
384                replace.new_reference,
385            );
386            deltas.push(OrderBookDelta::new(
387                self.instrument_id,
388                BookAction::Add,
389                new_order,
390                RecordFlag::F_LAST as u8,
391                self.sequence,
392                ts,
393                ts,
394            ));
395        }
396    }
397
398    fn handle_end_of_messages(&mut self, ts: UnixNanos, deltas: &mut Vec<OrderBookDelta>) {
399        self.sequence += 1;
400        deltas.push(OrderBookDelta::clear(
401            self.instrument_id,
402            self.sequence,
403            ts,
404            ts,
405        ));
406    }
407}
408
409fn convert_side(side: itchy::Side) -> OrderSide {
410    match side {
411        itchy::Side::Buy => OrderSide::Buy,
412        itchy::Side::Sell => OrderSide::Sell,
413    }
414}
415
416fn convert_price(price: itchy::Price4) -> Price {
417    Price::new(f64::from(price.raw()) / 10_000.0, PRICE_PRECISION)
418}
419
420#[cfg(test)]
421mod tests {
422    use std::{fs, fs::File, path::PathBuf, sync::Arc};
423
424    use nautilus_model::data::OrderBookDelta;
425    use nautilus_serialization::arrow::{ArrowSchemaProvider, EncodeToRecordBatch};
426    use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
427    use rstest::rstest;
428
429    use super::*;
430
431    const AAPL_ID: &str = "AAPL.XNAS";
432
433    fn setup_parser(base_ns: u64) -> ItchParser {
434        ItchParser::new(InstrumentId::from(AAPL_ID), "AAPL", base_ns)
435    }
436
437    fn aapl_stream_with(messages: &[Vec<u8>]) -> Vec<u8> {
438        let mut buf = build_stock_directory_msg(1, b"AAPL    ");
439        for msg in messages {
440            buf.extend_from_slice(msg);
441        }
442        buf
443    }
444
445    #[rstest]
446    fn test_convert_side() {
447        assert_eq!(convert_side(itchy::Side::Buy), OrderSide::Buy);
448        assert_eq!(convert_side(itchy::Side::Sell), OrderSide::Sell);
449    }
450
451    #[rstest]
452    fn test_convert_price() {
453        let price = convert_price(itchy::Price4::from(1_2345));
454        assert_eq!(price.as_f64(), 1.2345);
455        assert_eq!(price.precision, PRICE_PRECISION);
456    }
457
458    #[rstest]
459    fn test_convert_price_whole_dollar() {
460        let price = convert_price(itchy::Price4::from(100_0000));
461        assert_eq!(price.as_f64(), 100.0);
462    }
463
464    #[rstest]
465    fn test_convert_price_sub_penny() {
466        let price = convert_price(itchy::Price4::from(150_2501));
467        assert_eq!(price.as_f64(), 150.2501);
468    }
469
470    #[rstest]
471    fn test_add_order() {
472        let buf = aapl_stream_with(&[build_add_order_msg(1, 42, b'B', 100, 1_502_500)]);
473        let mut parser = setup_parser(0);
474        let deltas = parser.parse_reader(&buf[..]).unwrap();
475
476        assert_eq!(deltas.len(), 1);
477        assert_eq!(deltas[0].action, BookAction::Add);
478        assert_eq!(deltas[0].order.side, OrderSide::Buy);
479        assert_eq!(deltas[0].order.price.as_f64(), 150.25);
480        assert_eq!(deltas[0].order.size.as_f64(), 100.0);
481        assert_eq!(deltas[0].order.order_id, 42);
482    }
483
484    #[rstest]
485    fn test_delete_order() {
486        let buf = aapl_stream_with(&[
487            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
488            build_delete_order_msg(1, 42),
489        ]);
490        let mut parser = setup_parser(0);
491        let deltas = parser.parse_reader(&buf[..]).unwrap();
492
493        assert_eq!(deltas.len(), 2);
494        assert_eq!(deltas[1].action, BookAction::Delete);
495        assert_eq!(deltas[1].order.order_id, 42);
496        assert_eq!(deltas[1].order.size.as_f64(), 0.0);
497    }
498
499    #[rstest]
500    fn test_delete_unknown_order_is_ignored() {
501        let buf = aapl_stream_with(&[build_delete_order_msg(1, 999)]);
502        let mut parser = setup_parser(0);
503        let deltas = parser.parse_reader(&buf[..]).unwrap();
504
505        assert_eq!(deltas.len(), 0);
506    }
507
508    #[rstest]
509    fn test_partial_cancel_reduces_size() {
510        let buf = aapl_stream_with(&[
511            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
512            build_order_cancelled_msg(1, 42, 30),
513        ]);
514        let mut parser = setup_parser(0);
515        let deltas = parser.parse_reader(&buf[..]).unwrap();
516
517        assert_eq!(deltas.len(), 2);
518        assert_eq!(deltas[1].action, BookAction::Update);
519        assert_eq!(deltas[1].order.size.as_f64(), 70.0);
520        assert_eq!(deltas[1].order.price.as_f64(), 150.0);
521        assert_eq!(deltas[1].order.order_id, 42);
522    }
523
524    #[rstest]
525    fn test_full_cancel_deletes_order() {
526        let buf = aapl_stream_with(&[
527            build_add_order_msg(1, 42, b'S', 100, 1_500_000),
528            build_order_cancelled_msg(1, 42, 100),
529        ]);
530        let mut parser = setup_parser(0);
531        let deltas = parser.parse_reader(&buf[..]).unwrap();
532
533        assert_eq!(deltas.len(), 2);
534        assert_eq!(deltas[1].action, BookAction::Delete);
535        assert_eq!(deltas[1].order.size.as_f64(), 0.0);
536    }
537
538    #[rstest]
539    fn test_cancel_unknown_order_is_ignored() {
540        let buf = aapl_stream_with(&[build_order_cancelled_msg(1, 999, 50)]);
541        let mut parser = setup_parser(0);
542        let deltas = parser.parse_reader(&buf[..]).unwrap();
543
544        assert_eq!(deltas.len(), 0);
545    }
546
547    #[rstest]
548    fn test_partial_execution_updates_size() {
549        let buf = aapl_stream_with(&[
550            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
551            build_order_executed_msg(1, 42, 40, 1001),
552        ]);
553        let mut parser = setup_parser(0);
554        let deltas = parser.parse_reader(&buf[..]).unwrap();
555
556        assert_eq!(deltas.len(), 2);
557        assert_eq!(deltas[1].action, BookAction::Update);
558        assert_eq!(deltas[1].order.size.as_f64(), 60.0);
559        assert_eq!(deltas[1].order.order_id, 42);
560    }
561
562    #[rstest]
563    fn test_full_execution_deletes_order() {
564        let buf = aapl_stream_with(&[
565            build_add_order_msg(1, 42, b'S', 100, 1_500_000),
566            build_order_executed_msg(1, 42, 100, 1001),
567        ]);
568        let mut parser = setup_parser(0);
569        let deltas = parser.parse_reader(&buf[..]).unwrap();
570
571        assert_eq!(deltas.len(), 2);
572        assert_eq!(deltas[1].action, BookAction::Delete);
573        assert_eq!(deltas[1].order.size.as_f64(), 0.0);
574    }
575
576    #[rstest]
577    fn test_multiple_partial_executions_then_full() {
578        let buf = aapl_stream_with(&[
579            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
580            build_order_executed_msg(1, 42, 30, 1001),
581            build_order_executed_msg(1, 42, 30, 1002),
582            build_order_executed_msg(1, 42, 40, 1003),
583        ]);
584        let mut parser = setup_parser(0);
585        let deltas = parser.parse_reader(&buf[..]).unwrap();
586
587        assert_eq!(deltas.len(), 4);
588        assert_eq!(deltas[1].action, BookAction::Update);
589        assert_eq!(deltas[1].order.size.as_f64(), 70.0);
590        assert_eq!(deltas[2].action, BookAction::Update);
591        assert_eq!(deltas[2].order.size.as_f64(), 40.0);
592        assert_eq!(deltas[3].action, BookAction::Delete);
593        assert_eq!(deltas[3].order.size.as_f64(), 0.0);
594    }
595
596    #[rstest]
597    fn test_executed_with_price_partial() {
598        let buf = aapl_stream_with(&[
599            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
600            build_order_executed_with_price_msg(1, 42, 25, 2001, 1_505_000),
601        ]);
602        let mut parser = setup_parser(0);
603        let deltas = parser.parse_reader(&buf[..]).unwrap();
604
605        assert_eq!(deltas.len(), 2);
606        assert_eq!(deltas[1].action, BookAction::Update);
607        assert_eq!(deltas[1].order.size.as_f64(), 75.0);
608        // Book order retains the resting price, not the execution price
609        assert_eq!(deltas[1].order.price.as_f64(), 150.0);
610    }
611
612    #[rstest]
613    fn test_execution_unknown_order_is_ignored() {
614        let buf = aapl_stream_with(&[build_order_executed_msg(1, 999, 50, 1001)]);
615        let mut parser = setup_parser(0);
616        let deltas = parser.parse_reader(&buf[..]).unwrap();
617
618        assert_eq!(deltas.len(), 0);
619    }
620
621    #[rstest]
622    fn test_replace_order() {
623        let buf = aapl_stream_with(&[
624            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
625            build_replace_order_msg(1, 42, 43, 150, 1_510_000),
626        ]);
627        let mut parser = setup_parser(0);
628        let deltas = parser.parse_reader(&buf[..]).unwrap();
629
630        assert_eq!(deltas.len(), 3);
631        assert_eq!(deltas[1].action, BookAction::Delete);
632        assert_eq!(deltas[1].order.order_id, 42);
633        assert_eq!(deltas[2].action, BookAction::Add);
634        assert_eq!(deltas[2].order.order_id, 43);
635        assert_eq!(deltas[2].order.price.as_f64(), 151.0);
636        assert_eq!(deltas[2].order.size.as_f64(), 150.0);
637        assert_eq!(deltas[2].order.side, OrderSide::Buy);
638    }
639
640    #[rstest]
641    fn test_replace_inherits_side_from_original() {
642        let buf = aapl_stream_with(&[
643            build_add_order_msg(1, 42, b'S', 100, 1_500_000),
644            build_replace_order_msg(1, 42, 43, 200, 1_490_000),
645        ]);
646        let mut parser = setup_parser(0);
647        let deltas = parser.parse_reader(&buf[..]).unwrap();
648
649        assert_eq!(deltas[2].order.side, OrderSide::Sell);
650    }
651
652    #[rstest]
653    fn test_replace_delete_has_no_f_last_flag() {
654        // The delete in a replace pair is not the last event in the group
655        let buf = aapl_stream_with(&[
656            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
657            build_replace_order_msg(1, 42, 43, 100, 1_510_000),
658        ]);
659        let mut parser = setup_parser(0);
660        let deltas = parser.parse_reader(&buf[..]).unwrap();
661
662        assert_eq!(deltas[1].flags, 0);
663        assert_ne!(deltas[2].flags & RecordFlag::F_LAST as u8, 0);
664    }
665
666    #[rstest]
667    fn test_replace_unknown_order_is_ignored() {
668        let buf = aapl_stream_with(&[build_replace_order_msg(1, 999, 1000, 100, 1_500_000)]);
669        let mut parser = setup_parser(0);
670        let deltas = parser.parse_reader(&buf[..]).unwrap();
671
672        assert_eq!(deltas.len(), 0);
673    }
674
675    #[rstest]
676    fn test_end_of_messages_emits_clear() {
677        let buf = aapl_stream_with(&[
678            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
679            // EndOfMessages uses locate=0 (feed-level, not stock-specific)
680            build_system_event_msg(0, b'C'),
681        ]);
682        let mut parser = setup_parser(0);
683        let deltas = parser.parse_reader(&buf[..]).unwrap();
684
685        assert_eq!(deltas.len(), 2);
686        assert_eq!(deltas[1].action, BookAction::Clear);
687    }
688
689    #[rstest]
690    fn test_end_of_messages_with_different_locate() {
691        // Regression: EndOfMessages must be processed regardless of locate code
692        let buf = aapl_stream_with(&[
693            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
694            build_system_event_msg(99, b'C'),
695        ]);
696        let mut parser = setup_parser(0);
697        let deltas = parser.parse_reader(&buf[..]).unwrap();
698
699        assert_eq!(deltas.len(), 2);
700        assert_eq!(deltas[1].action, BookAction::Clear);
701    }
702
703    #[rstest]
704    fn test_filters_by_stock_locate() {
705        let mut buf = build_stock_directory_msg(1, b"AAPL    ");
706        buf.extend_from_slice(&build_stock_directory_msg(2, b"MSFT    "));
707        buf.extend_from_slice(&build_add_order_msg(2, 10, b'B', 50, 3_000_000));
708        buf.extend_from_slice(&build_add_order_msg(1, 11, b'S', 200, 1_500_000));
709
710        let mut parser = setup_parser(0);
711        let deltas = parser.parse_reader(&buf[..]).unwrap();
712
713        assert_eq!(deltas.len(), 1);
714        assert_eq!(deltas[0].order.order_id, 11);
715    }
716
717    #[rstest]
718    fn test_messages_before_directory_are_ignored() {
719        let mut buf = Vec::new();
720        // AddOrder arrives before any StockDirectory
721        buf.extend_from_slice(&build_add_order_msg(1, 42, b'B', 100, 1_500_000));
722        buf.extend_from_slice(&build_stock_directory_msg(1, b"AAPL    "));
723        buf.extend_from_slice(&build_add_order_msg(1, 43, b'B', 100, 1_500_000));
724
725        let mut parser = setup_parser(0);
726        let deltas = parser.parse_reader(&buf[..]).unwrap();
727
728        // Only the second add (after directory) should be captured
729        assert_eq!(deltas.len(), 1);
730        assert_eq!(deltas[0].order.order_id, 43);
731    }
732
733    #[rstest]
734    fn test_timestamp_offset_from_midnight() {
735        let base_ns: u64 = 1_548_806_400_000_000_000; // 2019-01-30 midnight UTC
736        let itch_ts: u64 = 34_200_000_000_000; // 9:30 AM (ns since midnight)
737        let buf = aapl_stream_with(&[build_add_order_msg_with_ts(
738            1, 42, b'B', 100, 1_500_000, itch_ts,
739        )]);
740        let mut parser = setup_parser(base_ns);
741        let deltas = parser.parse_reader(&buf[..]).unwrap();
742
743        assert_eq!(deltas[0].ts_event, UnixNanos::from(base_ns + itch_ts));
744        assert_eq!(deltas[0].ts_init, deltas[0].ts_event);
745    }
746
747    #[rstest]
748    fn test_f_last_set_on_final_delta() {
749        let buf = aapl_stream_with(&[
750            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
751            build_add_order_msg(1, 43, b'S', 200, 1_510_000),
752        ]);
753        let mut parser = setup_parser(0);
754        let deltas = parser.parse_reader(&buf[..]).unwrap();
755
756        assert_eq!(deltas.len(), 2);
757        assert_ne!(deltas[1].flags & RecordFlag::F_LAST as u8, 0);
758    }
759
760    #[rstest]
761    fn test_sequence_numbers_are_monotonic() {
762        let buf = aapl_stream_with(&[
763            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
764            build_order_executed_msg(1, 42, 50, 1001),
765            build_add_order_msg(1, 43, b'S', 200, 1_510_000),
766            build_delete_order_msg(1, 43),
767        ]);
768        let mut parser = setup_parser(0);
769        let deltas = parser.parse_reader(&buf[..]).unwrap();
770
771        for i in 1..deltas.len() {
772            assert!(deltas[i].sequence > deltas[i - 1].sequence);
773        }
774    }
775
776    #[rstest]
777    fn test_empty_stream() {
778        let buf: &[u8] = &[];
779        let mut parser = setup_parser(0);
780        let deltas = parser.parse_reader(buf).unwrap();
781
782        assert_eq!(deltas.len(), 0);
783    }
784
785    fn build_msg(tag: u8, stock_locate: u16, timestamp: u64, body: &[u8]) -> Vec<u8> {
786        let msg_len = (1 + 2 + 2 + 6 + body.len()) as u16;
787        let mut buf = Vec::new();
788        buf.extend_from_slice(&msg_len.to_be_bytes());
789        buf.push(tag);
790        buf.extend_from_slice(&stock_locate.to_be_bytes());
791        buf.extend_from_slice(&0u16.to_be_bytes()); // tracking_number
792        // 6-byte timestamp (big-endian u48)
793        buf.push((timestamp >> 40) as u8);
794        buf.push((timestamp >> 32) as u8);
795        buf.push((timestamp >> 24) as u8);
796        buf.push((timestamp >> 16) as u8);
797        buf.push((timestamp >> 8) as u8);
798        buf.push(timestamp as u8);
799        buf.extend_from_slice(body);
800        buf
801    }
802
803    fn build_stock_directory_msg(locate: u16, stock: &[u8; 8]) -> Vec<u8> {
804        let mut body = Vec::new();
805        body.extend_from_slice(stock);
806        body.push(b'Q'); // market_category
807        body.push(b'N'); // financial_status
808        body.extend_from_slice(&100u32.to_be_bytes()); // round_lot_size
809        body.push(b'Y'); // round_lots_only
810        body.push(b'C'); // issue_classification
811        body.extend_from_slice(b"C "); // issue_subtype
812        body.push(b'P'); // authenticity
813        body.push(b'N'); // short_sale_threshold
814        body.push(b'N'); // ipo_flag
815        body.push(b'1'); // luld_ref_price_tier
816        body.push(b'N'); // etp_flag
817        body.extend_from_slice(&0u32.to_be_bytes()); // etp_leverage_factor
818        body.push(b'N'); // inverse_indicator
819        build_msg(b'R', locate, 0, &body)
820    }
821
822    fn build_add_order_msg(
823        locate: u16,
824        reference: u64,
825        side: u8,
826        shares: u32,
827        price: u32,
828    ) -> Vec<u8> {
829        build_add_order_msg_with_ts(locate, reference, side, shares, price, 0)
830    }
831
832    fn build_add_order_msg_with_ts(
833        locate: u16,
834        reference: u64,
835        side: u8,
836        shares: u32,
837        price: u32,
838        timestamp: u64,
839    ) -> Vec<u8> {
840        let mut body = Vec::new();
841        body.extend_from_slice(&reference.to_be_bytes());
842        body.push(side);
843        body.extend_from_slice(&shares.to_be_bytes());
844        body.extend_from_slice(b"AAPL    ");
845        body.extend_from_slice(&price.to_be_bytes());
846        build_msg(b'A', locate, timestamp, &body)
847    }
848
849    fn build_delete_order_msg(locate: u16, reference: u64) -> Vec<u8> {
850        build_msg(b'D', locate, 0, &reference.to_be_bytes())
851    }
852
853    fn build_order_cancelled_msg(locate: u16, reference: u64, cancelled: u32) -> Vec<u8> {
854        let mut body = Vec::new();
855        body.extend_from_slice(&reference.to_be_bytes());
856        body.extend_from_slice(&cancelled.to_be_bytes());
857        build_msg(b'X', locate, 0, &body)
858    }
859
860    fn build_order_executed_msg(
861        locate: u16,
862        reference: u64,
863        executed: u32,
864        match_number: u64,
865    ) -> Vec<u8> {
866        let mut body = Vec::new();
867        body.extend_from_slice(&reference.to_be_bytes());
868        body.extend_from_slice(&executed.to_be_bytes());
869        body.extend_from_slice(&match_number.to_be_bytes());
870        build_msg(b'E', locate, 0, &body)
871    }
872
873    fn build_order_executed_with_price_msg(
874        locate: u16,
875        reference: u64,
876        executed: u32,
877        match_number: u64,
878        price: u32,
879    ) -> Vec<u8> {
880        let mut body = Vec::new();
881        body.extend_from_slice(&reference.to_be_bytes());
882        body.extend_from_slice(&executed.to_be_bytes());
883        body.extend_from_slice(&match_number.to_be_bytes());
884        body.push(b'Y'); // printable
885        body.extend_from_slice(&price.to_be_bytes());
886        build_msg(b'C', locate, 0, &body)
887    }
888
889    fn build_replace_order_msg(
890        locate: u16,
891        old_reference: u64,
892        new_reference: u64,
893        shares: u32,
894        price: u32,
895    ) -> Vec<u8> {
896        let mut body = Vec::new();
897        body.extend_from_slice(&old_reference.to_be_bytes());
898        body.extend_from_slice(&new_reference.to_be_bytes());
899        body.extend_from_slice(&shares.to_be_bytes());
900        body.extend_from_slice(&price.to_be_bytes());
901        build_msg(b'U', locate, 0, &body)
902    }
903
904    fn build_system_event_msg(locate: u16, event_code: u8) -> Vec<u8> {
905        build_msg(b'S', locate, 0, &[event_code])
906    }
907
908    // Curates AAPL L3 deltas from NASDAQ ITCH 5.0 binary into NautilusTrader Parquet.
909    // Download source: https://emi.nasdaq.com/ITCH/Nasdaq%20ITCH/01302019.NASDAQ_ITCH50.gz
910    // Run: cargo test -p nautilus-testkit --lib test_curate_aapl_itch -- --ignored --nocapture
911    #[rstest]
912    #[ignore = "one-time dataset curation, not for routine CI"]
913    fn test_curate_aapl_itch() {
914        let itch_path = PathBuf::from("/tmp/01302019.NASDAQ_ITCH50.gz");
915        let instrument_id = InstrumentId::from("AAPL.XNAS");
916
917        // 2019-01-30 midnight EST (UTC-5) as Unix nanoseconds
918        let base_ns: u64 = 1_548_824_400_000_000_000;
919        let parquet_path = "/tmp/itch_AAPL.XNAS_2019-01-30_deltas.parquet";
920
921        println!("Parsing ITCH from {}", itch_path.display());
922        let mut parser = ItchParser::new(instrument_id, "AAPL", base_ns);
923        let deltas = parser.parse_gzip_file(&itch_path).unwrap();
924        let count = deltas.len();
925        println!("Parsed {count} deltas for AAPL");
926
927        let metadata =
928            OrderBookDelta::get_metadata(&instrument_id, PRICE_PRECISION, SIZE_PRECISION);
929        let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
930
931        println!("Writing Parquet to {parquet_path}");
932        let file = File::create(parquet_path).unwrap();
933        let zstd_level = parquet::basic::ZstdLevel::try_new(3).unwrap();
934        let props = WriterProperties::builder()
935            .set_compression(parquet::basic::Compression::ZSTD(zstd_level))
936            .set_max_row_group_size(1_000_000)
937            .build();
938        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
939
940        let chunk_size = 1_000_000;
941        for (i, chunk) in deltas.chunks(chunk_size).enumerate() {
942            println!("  Encoding chunk {} ({} records)...", i + 1, chunk.len());
943            let batch = OrderBookDelta::encode_batch(&metadata, chunk).unwrap();
944            writer.write(&batch).unwrap();
945        }
946        writer.close().unwrap();
947
948        let file_size = fs::metadata(parquet_path).unwrap().len();
949        println!("\nRecords: {count}");
950        println!("Price precision: {PRICE_PRECISION}");
951        println!("Size precision: {SIZE_PRECISION}");
952        println!(
953            "File size: {} bytes ({:.1} MB)",
954            file_size,
955            file_size as f64 / 1_048_576.0
956        );
957        println!("Output: {parquet_path}");
958        println!("\nNext steps:");
959        println!("  sha256sum {parquet_path}");
960    }
961}