nautilus_tardis/csv/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16mod record;
17
18use std::{
19    error::Error,
20    ffi::OsStr,
21    fs::File,
22    io::{BufReader, Read, Seek, SeekFrom},
23    path::Path,
24    time::Duration,
25};
26
27use csv::{Reader, ReaderBuilder, StringRecord};
28use flate2::read::GzDecoder;
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31    data::{
32        BookOrder, DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
33    },
34    enums::{BookAction, OrderSide, RecordFlag},
35    identifiers::{InstrumentId, TradeId},
36    types::{Quantity, fixed::FIXED_PRECISION},
37};
38
39use super::{
40    csv::record::{
41        TardisBookUpdateRecord, TardisOrderBookSnapshot5Record, TardisOrderBookSnapshot25Record,
42        TardisQuoteRecord, TardisTradeRecord,
43    },
44    parse::{
45        parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
46        parse_timestamp,
47    },
48};
49use crate::parse::parse_price;
50
51fn infer_precision(value: f64) -> u8 {
52    let str_value = value.to_string(); // Single allocation
53    match str_value.find('.') {
54        Some(decimal_idx) => (str_value.len() - decimal_idx - 1) as u8,
55        None => 0,
56    }
57}
58
59fn create_csv_reader<P: AsRef<Path>>(
60    filepath: P,
61) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
62    let filepath_ref = filepath.as_ref();
63    const MAX_RETRIES: u8 = 3;
64    const DELAY_MS: u64 = 100;
65
66    fn open_file_with_retry<P: AsRef<Path>>(
67        path: P,
68        max_retries: u8,
69        delay_ms: u64,
70    ) -> anyhow::Result<File> {
71        let path_ref = path.as_ref();
72        for attempt in 1..=max_retries {
73            match File::open(path_ref) {
74                Ok(file) => return Ok(file),
75                Err(e) => {
76                    if attempt == max_retries {
77                        anyhow::bail!(
78                            "Failed to open file '{path_ref:?}' after {max_retries} attempts: {e}"
79                        );
80                    }
81                    eprintln!(
82                        "Attempt {attempt}/{max_retries} failed to open file '{path_ref:?}': {e}. Retrying after {delay_ms}ms..."
83                    );
84                    std::thread::sleep(Duration::from_millis(delay_ms));
85                }
86            }
87        }
88        unreachable!("Loop should return either Ok or Err");
89    }
90
91    let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
92
93    let is_gzipped = filepath_ref
94        .extension()
95        .and_then(OsStr::to_str)
96        .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
97
98    if !is_gzipped {
99        let buf_reader = BufReader::new(file);
100        return Ok(ReaderBuilder::new()
101            .has_headers(true)
102            .from_reader(Box::new(buf_reader)));
103    }
104
105    let file_size = file.metadata()?.len();
106    if file_size < 2 {
107        anyhow::bail!("File too small to be a valid gzip file");
108    }
109
110    let mut header_buf = [0u8; 2];
111    for attempt in 1..=MAX_RETRIES {
112        match file.read_exact(&mut header_buf) {
113            Ok(()) => break,
114            Err(e) => {
115                if attempt == MAX_RETRIES {
116                    anyhow::bail!(
117                        "Failed to read gzip header from '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
118                    );
119                }
120                eprintln!(
121                    "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
122                );
123                std::thread::sleep(Duration::from_millis(DELAY_MS));
124            }
125        }
126    }
127
128    if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
129        anyhow::bail!("File '{filepath_ref:?}' has .gz extension but invalid gzip header");
130    }
131
132    for attempt in 1..=MAX_RETRIES {
133        match file.seek(SeekFrom::Start(0)) {
134            Ok(_) => break,
135            Err(e) => {
136                if attempt == MAX_RETRIES {
137                    anyhow::bail!(
138                        "Failed to reset file position for '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
139                    );
140                }
141                eprintln!(
142                    "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
143                );
144                std::thread::sleep(Duration::from_millis(DELAY_MS));
145            }
146        }
147    }
148
149    let buf_reader = BufReader::new(file);
150    let decoder = GzDecoder::new(buf_reader);
151
152    Ok(ReaderBuilder::new()
153        .has_headers(true)
154        .from_reader(Box::new(decoder)))
155}
156
157/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
158/// automatically applying `GZip` decompression for files ending in ".gz".
159pub fn load_deltas<P: AsRef<Path>>(
160    filepath: P,
161    price_precision: Option<u8>,
162    size_precision: Option<u8>,
163    instrument_id: Option<InstrumentId>,
164    limit: Option<usize>,
165) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
166    // Infer precisions if not provided
167    let (price_precision, size_precision) = match (price_precision, size_precision) {
168        (Some(p), Some(s)) => (p, s),
169        (price_precision, size_precision) => {
170            let mut reader = create_csv_reader(&filepath)?;
171            let mut record = StringRecord::new();
172
173            let mut max_price_precision = 0u8;
174            let mut max_size_precision = 0u8;
175            let mut count = 0;
176
177            while reader.read_record(&mut record)? {
178                let parsed: TardisBookUpdateRecord = record.deserialize(None)?;
179
180                if price_precision.is_none() {
181                    max_price_precision = infer_precision(parsed.price).max(max_price_precision);
182                }
183
184                if size_precision.is_none() {
185                    max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
186                }
187
188                if let Some(limit) = limit {
189                    if count >= limit {
190                        break;
191                    }
192                    count += 1;
193                }
194            }
195
196            drop(reader);
197
198            max_price_precision = max_price_precision.min(FIXED_PRECISION);
199            max_size_precision = max_size_precision.min(FIXED_PRECISION);
200
201            (
202                price_precision.unwrap_or(max_price_precision),
203                size_precision.unwrap_or(max_size_precision),
204            )
205        }
206    };
207
208    let mut deltas: Vec<OrderBookDelta> = Vec::new();
209    let mut last_ts_event = UnixNanos::default();
210
211    let mut reader = create_csv_reader(filepath)?;
212    let mut record = StringRecord::new();
213
214    while reader.read_record(&mut record)? {
215        let record: TardisBookUpdateRecord = record.deserialize(None)?;
216
217        let instrument_id = match &instrument_id {
218            Some(id) => *id,
219            None => parse_instrument_id(&record.exchange, record.symbol),
220        };
221        let side = parse_order_side(&record.side);
222        let price = parse_price(record.price, price_precision);
223        let size = Quantity::new(record.amount, size_precision);
224        let order_id = 0; // Not applicable for L2 data
225        let order = BookOrder::new(side, price, size, order_id);
226
227        let action = parse_book_action(record.is_snapshot, size.as_f64());
228        let flags = 0; // Flags always zero until timestamp changes
229        let sequence = 0; // Sequence not available
230        let ts_event = parse_timestamp(record.timestamp);
231        let ts_init = parse_timestamp(record.local_timestamp);
232
233        // Check if timestamp is different from last timestamp
234        if last_ts_event != ts_event {
235            if let Some(last_delta) = deltas.last_mut() {
236                // Set previous delta flags as F_LAST
237                last_delta.flags = RecordFlag::F_LAST.value();
238            }
239        }
240
241        assert!(
242            !(action != BookAction::Delete && size.is_zero()),
243            "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {record:?}"
244        );
245
246        last_ts_event = ts_event;
247
248        let delta = OrderBookDelta::new(
249            instrument_id,
250            action,
251            order,
252            flags,
253            sequence,
254            ts_event,
255            ts_init,
256        );
257
258        deltas.push(delta);
259
260        if let Some(limit) = limit {
261            if deltas.len() >= limit {
262                break;
263            }
264        }
265    }
266
267    // Set F_LAST flag for final delta
268    if let Some(last_delta) = deltas.last_mut() {
269        last_delta.flags = RecordFlag::F_LAST.value();
270    }
271
272    Ok(deltas)
273}
274
275fn create_book_order(
276    side: OrderSide,
277    price: Option<f64>,
278    amount: Option<f64>,
279    price_precision: u8,
280    size_precision: u8,
281) -> (BookOrder, u32) {
282    match price {
283        Some(price) => (
284            BookOrder::new(
285                side,
286                parse_price(price, price_precision),
287                Quantity::new(amount.unwrap_or(0.0), size_precision),
288                0,
289            ),
290            1, // Count set to 1 if order exists
291        ),
292        None => (NULL_ORDER, 0), // NULL_ORDER if price is None
293    }
294}
295
296/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
297/// automatically applying `GZip` decompression for files ending in ".gz".
298pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
299    filepath: P,
300    price_precision: Option<u8>,
301    size_precision: Option<u8>,
302    instrument_id: Option<InstrumentId>,
303    limit: Option<usize>,
304) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
305    // Infer precisions if not provided
306    let (price_precision, size_precision) = match (price_precision, size_precision) {
307        (Some(p), Some(s)) => (p, s),
308        (price_precision, size_precision) => {
309            let mut reader = create_csv_reader(&filepath)?;
310            let mut record = StringRecord::new();
311
312            let mut max_price_precision = 0u8;
313            let mut max_size_precision = 0u8;
314            let mut count = 0;
315
316            while reader.read_record(&mut record)? {
317                let parsed: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
318
319                if price_precision.is_none() {
320                    if let Some(bid_price) = parsed.bids_0_price {
321                        max_price_precision = infer_precision(bid_price).max(max_price_precision);
322                    }
323                }
324
325                if size_precision.is_none() {
326                    if let Some(bid_amount) = parsed.bids_0_amount {
327                        max_size_precision = infer_precision(bid_amount).max(max_size_precision);
328                    }
329                }
330
331                if let Some(limit) = limit {
332                    if count >= limit {
333                        break;
334                    }
335                    count += 1;
336                }
337            }
338
339            drop(reader);
340
341            max_price_precision = max_price_precision.min(FIXED_PRECISION);
342            max_size_precision = max_size_precision.min(FIXED_PRECISION);
343
344            (
345                price_precision.unwrap_or(max_price_precision),
346                size_precision.unwrap_or(max_size_precision),
347            )
348        }
349    };
350
351    let mut depths: Vec<OrderBookDepth10> = Vec::new();
352
353    let mut reader = create_csv_reader(filepath)?;
354    let mut record = StringRecord::new();
355    while reader.read_record(&mut record)? {
356        let record: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
357        let instrument_id = match &instrument_id {
358            Some(id) => *id,
359            None => parse_instrument_id(&record.exchange, record.symbol),
360        };
361        let flags = RecordFlag::F_LAST.value();
362        let sequence = 0; // Sequence not available
363        let ts_event = parse_timestamp(record.timestamp);
364        let ts_init = parse_timestamp(record.local_timestamp);
365
366        // Initialize empty arrays
367        let mut bids = [NULL_ORDER; DEPTH10_LEN];
368        let mut asks = [NULL_ORDER; DEPTH10_LEN];
369        let mut bid_counts = [0u32; DEPTH10_LEN];
370        let mut ask_counts = [0u32; DEPTH10_LEN];
371
372        for i in 0..=4 {
373            // Create bids
374            let (bid_order, bid_count) = create_book_order(
375                OrderSide::Buy,
376                match i {
377                    0 => record.bids_0_price,
378                    1 => record.bids_1_price,
379                    2 => record.bids_2_price,
380                    3 => record.bids_3_price,
381                    4 => record.bids_4_price,
382                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
383                },
384                match i {
385                    0 => record.bids_0_amount,
386                    1 => record.bids_1_amount,
387                    2 => record.bids_2_amount,
388                    3 => record.bids_3_amount,
389                    4 => record.bids_4_amount,
390                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
391                },
392                price_precision,
393                size_precision,
394            );
395            bids[i] = bid_order;
396            bid_counts[i] = bid_count;
397
398            // Create asks
399            let (ask_order, ask_count) = create_book_order(
400                OrderSide::Sell,
401                match i {
402                    0 => record.asks_0_price,
403                    1 => record.asks_1_price,
404                    2 => record.asks_2_price,
405                    3 => record.asks_3_price,
406                    4 => record.asks_4_price,
407                    _ => None, // Unreachable, but for safety
408                },
409                match i {
410                    0 => record.asks_0_amount,
411                    1 => record.asks_1_amount,
412                    2 => record.asks_2_amount,
413                    3 => record.asks_3_amount,
414                    4 => record.asks_4_amount,
415                    _ => None, // Unreachable, but for safety
416                },
417                price_precision,
418                size_precision,
419            );
420            asks[i] = ask_order;
421            ask_counts[i] = ask_count;
422        }
423
424        let depth = OrderBookDepth10::new(
425            instrument_id,
426            bids,
427            asks,
428            bid_counts,
429            ask_counts,
430            flags,
431            sequence,
432            ts_event,
433            ts_init,
434        );
435
436        depths.push(depth);
437
438        if let Some(limit) = limit {
439            if depths.len() >= limit {
440                break;
441            }
442        }
443    }
444
445    Ok(depths)
446}
447
448/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
449/// automatically applying `GZip` decompression for files ending in ".gz".
450pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
451    filepath: P,
452    price_precision: Option<u8>,
453    size_precision: Option<u8>,
454    instrument_id: Option<InstrumentId>,
455    limit: Option<usize>,
456) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
457    // Infer precisions if not provided
458    let (price_precision, size_precision) = match (price_precision, size_precision) {
459        (Some(p), Some(s)) => (p, s),
460        (price_precision, size_precision) => {
461            let mut reader = create_csv_reader(&filepath)?;
462            let mut record = StringRecord::new();
463
464            let mut max_price_precision = 0u8;
465            let mut max_size_precision = 0u8;
466            let mut count = 0;
467
468            while reader.read_record(&mut record)? {
469                let parsed: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
470
471                if price_precision.is_none() {
472                    if let Some(bid_price) = parsed.bids_0_price {
473                        max_price_precision = infer_precision(bid_price).max(max_price_precision);
474                    }
475                }
476
477                if size_precision.is_none() {
478                    if let Some(bid_amount) = parsed.bids_0_amount {
479                        max_size_precision = infer_precision(bid_amount).max(max_size_precision);
480                    }
481                }
482
483                if let Some(limit) = limit {
484                    if count >= limit {
485                        break;
486                    }
487                    count += 1;
488                }
489            }
490
491            drop(reader);
492
493            max_price_precision = max_price_precision.min(FIXED_PRECISION);
494            max_size_precision = max_size_precision.min(FIXED_PRECISION);
495
496            (
497                price_precision.unwrap_or(max_price_precision),
498                size_precision.unwrap_or(max_size_precision),
499            )
500        }
501    };
502
503    let mut depths: Vec<OrderBookDepth10> = Vec::new();
504    let mut reader = create_csv_reader(filepath)?;
505    let mut record = StringRecord::new();
506
507    while reader.read_record(&mut record)? {
508        let record: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
509
510        let instrument_id = match &instrument_id {
511            Some(id) => *id,
512            None => parse_instrument_id(&record.exchange, record.symbol),
513        };
514        let flags = RecordFlag::F_LAST.value();
515        let sequence = 0; // Sequence not available
516        let ts_event = parse_timestamp(record.timestamp);
517        let ts_init = parse_timestamp(record.local_timestamp);
518
519        // Initialize empty arrays for the first 10 levels only
520        let mut bids = [NULL_ORDER; DEPTH10_LEN];
521        let mut asks = [NULL_ORDER; DEPTH10_LEN];
522        let mut bid_counts = [0u32; DEPTH10_LEN];
523        let mut ask_counts = [0u32; DEPTH10_LEN];
524
525        // Fill only the first 10 levels from the 25-level record
526        for i in 0..DEPTH10_LEN {
527            // Create bids
528            let (bid_order, bid_count) = create_book_order(
529                OrderSide::Buy,
530                match i {
531                    0 => record.bids_0_price,
532                    1 => record.bids_1_price,
533                    2 => record.bids_2_price,
534                    3 => record.bids_3_price,
535                    4 => record.bids_4_price,
536                    5 => record.bids_5_price,
537                    6 => record.bids_6_price,
538                    7 => record.bids_7_price,
539                    8 => record.bids_8_price,
540                    9 => record.bids_9_price,
541                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
542                },
543                match i {
544                    0 => record.bids_0_amount,
545                    1 => record.bids_1_amount,
546                    2 => record.bids_2_amount,
547                    3 => record.bids_3_amount,
548                    4 => record.bids_4_amount,
549                    5 => record.bids_5_amount,
550                    6 => record.bids_6_amount,
551                    7 => record.bids_7_amount,
552                    8 => record.bids_8_amount,
553                    9 => record.bids_9_amount,
554                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
555                },
556                price_precision,
557                size_precision,
558            );
559            bids[i] = bid_order;
560            bid_counts[i] = bid_count;
561
562            // Create asks
563            let (ask_order, ask_count) = create_book_order(
564                OrderSide::Sell,
565                match i {
566                    0 => record.asks_0_price,
567                    1 => record.asks_1_price,
568                    2 => record.asks_2_price,
569                    3 => record.asks_3_price,
570                    4 => record.asks_4_price,
571                    5 => record.asks_5_price,
572                    6 => record.asks_6_price,
573                    7 => record.asks_7_price,
574                    8 => record.asks_8_price,
575                    9 => record.asks_9_price,
576                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
577                },
578                match i {
579                    0 => record.asks_0_amount,
580                    1 => record.asks_1_amount,
581                    2 => record.asks_2_amount,
582                    3 => record.asks_3_amount,
583                    4 => record.asks_4_amount,
584                    5 => record.asks_5_amount,
585                    6 => record.asks_6_amount,
586                    7 => record.asks_7_amount,
587                    8 => record.asks_8_amount,
588                    9 => record.asks_9_amount,
589                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
590                },
591                price_precision,
592                size_precision,
593            );
594            asks[i] = ask_order;
595            ask_counts[i] = ask_count;
596        }
597
598        let depth = OrderBookDepth10::new(
599            instrument_id,
600            bids,
601            asks,
602            bid_counts,
603            ask_counts,
604            flags,
605            sequence,
606            ts_event,
607            ts_init,
608        );
609
610        depths.push(depth);
611
612        if let Some(limit) = limit {
613            if depths.len() >= limit {
614                break;
615            }
616        }
617    }
618
619    Ok(depths)
620}
621
622/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
623/// automatically applying `GZip` decompression for files ending in ".gz".
624pub fn load_quote_ticks<P: AsRef<Path>>(
625    filepath: P,
626    price_precision: Option<u8>,
627    size_precision: Option<u8>,
628    instrument_id: Option<InstrumentId>,
629    limit: Option<usize>,
630) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
631    // Infer precisions if not provided
632    let (price_precision, size_precision) = match (price_precision, size_precision) {
633        (Some(p), Some(s)) => (p, s),
634        (price_precision, size_precision) => {
635            let mut reader = create_csv_reader(&filepath)?;
636            let mut record = StringRecord::new();
637
638            let mut max_price_precision = 0u8;
639            let mut max_size_precision = 0u8;
640            let mut count = 0;
641
642            while reader.read_record(&mut record)? {
643                let parsed: TardisQuoteRecord = record.deserialize(None)?;
644
645                if price_precision.is_none() {
646                    if let Some(bid_price) = parsed.bid_price {
647                        max_price_precision = infer_precision(bid_price).max(max_price_precision);
648                    }
649                }
650
651                if size_precision.is_none() {
652                    if let Some(bid_amount) = parsed.bid_amount {
653                        max_size_precision = infer_precision(bid_amount).max(max_size_precision);
654                    }
655                }
656
657                if let Some(limit) = limit {
658                    if count >= limit {
659                        break;
660                    }
661                    count += 1;
662                }
663            }
664
665            drop(reader);
666
667            max_price_precision = max_price_precision.min(FIXED_PRECISION);
668            max_size_precision = max_size_precision.min(FIXED_PRECISION);
669
670            (
671                price_precision.unwrap_or(max_price_precision),
672                size_precision.unwrap_or(max_size_precision),
673            )
674        }
675    };
676
677    let mut quotes = Vec::new();
678    let mut reader = create_csv_reader(filepath)?;
679    let mut record = StringRecord::new();
680
681    while reader.read_record(&mut record)? {
682        let record: TardisQuoteRecord = record.deserialize(None)?;
683
684        let instrument_id = match &instrument_id {
685            Some(id) => *id,
686            None => parse_instrument_id(&record.exchange, record.symbol),
687        };
688        let bid_price = parse_price(record.bid_price.unwrap_or(0.0), price_precision);
689        let bid_size = Quantity::new(record.bid_amount.unwrap_or(0.0), size_precision);
690        let ask_price = parse_price(record.ask_price.unwrap_or(0.0), price_precision);
691        let ask_size = Quantity::new(record.ask_amount.unwrap_or(0.0), size_precision);
692        let ts_event = parse_timestamp(record.timestamp);
693        let ts_init = parse_timestamp(record.local_timestamp);
694
695        let quote = QuoteTick::new(
696            instrument_id,
697            bid_price,
698            ask_price,
699            bid_size,
700            ask_size,
701            ts_event,
702            ts_init,
703        );
704
705        quotes.push(quote);
706
707        if let Some(limit) = limit {
708            if quotes.len() >= limit {
709                break;
710            }
711        }
712    }
713
714    Ok(quotes)
715}
716
717/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
718/// automatically applying `GZip` decompression for files ending in ".gz".
719pub fn load_trade_ticks<P: AsRef<Path>>(
720    filepath: P,
721    price_precision: Option<u8>,
722    size_precision: Option<u8>,
723    instrument_id: Option<InstrumentId>,
724    limit: Option<usize>,
725) -> Result<Vec<TradeTick>, Box<dyn Error>> {
726    // Infer precisions if not provided
727    let (price_precision, size_precision) = match (price_precision, size_precision) {
728        (Some(p), Some(s)) => (p, s),
729        (price_precision, size_precision) => {
730            let mut reader = create_csv_reader(&filepath)?;
731            let mut record = StringRecord::new();
732
733            let mut max_price_precision = 0u8;
734            let mut max_size_precision = 0u8;
735            let mut count = 0;
736
737            while reader.read_record(&mut record)? {
738                let parsed: TardisTradeRecord = record.deserialize(None)?;
739
740                if price_precision.is_none() {
741                    max_price_precision = infer_precision(parsed.price).max(max_price_precision);
742                }
743
744                if size_precision.is_none() {
745                    max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
746                }
747
748                if let Some(limit) = limit {
749                    if count >= limit {
750                        break;
751                    }
752                    count += 1;
753                }
754            }
755
756            drop(reader);
757
758            max_price_precision = max_price_precision.min(FIXED_PRECISION);
759            max_size_precision = max_size_precision.min(FIXED_PRECISION);
760
761            (
762                price_precision.unwrap_or(max_price_precision),
763                size_precision.unwrap_or(max_size_precision),
764            )
765        }
766    };
767
768    let mut trades = Vec::new();
769    let mut reader = create_csv_reader(filepath)?;
770    let mut record = StringRecord::new();
771
772    while reader.read_record(&mut record)? {
773        let record: TardisTradeRecord = record.deserialize(None)?;
774
775        let instrument_id = match &instrument_id {
776            Some(id) => *id,
777            None => parse_instrument_id(&record.exchange, record.symbol),
778        };
779        let price = parse_price(record.price, price_precision);
780        let size = Quantity::new(record.amount, size_precision);
781        let aggressor_side = parse_aggressor_side(&record.side);
782        let trade_id = TradeId::new(&record.id);
783        let ts_event = parse_timestamp(record.timestamp);
784        let ts_init = parse_timestamp(record.local_timestamp);
785
786        let trade = TradeTick::new(
787            instrument_id,
788            price,
789            size,
790            aggressor_side,
791            trade_id,
792            ts_event,
793            ts_init,
794        );
795
796        trades.push(trade);
797
798        if let Some(limit) = limit {
799            if trades.len() >= limit {
800                break;
801            }
802        }
803    }
804
805    Ok(trades)
806}
807
808////////////////////////////////////////////////////////////////////////////////
809// Tests
810////////////////////////////////////////////////////////////////////////////////
811#[cfg(test)]
812mod tests {
813    use nautilus_model::{
814        enums::{AggressorSide, BookAction},
815        identifiers::InstrumentId,
816        types::Price,
817    };
818    use nautilus_testkit::common::{
819        ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
820        ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
821        ensure_data_exists_tardis_huobi_quotes,
822    };
823    use rstest::*;
824
825    use super::*;
826
827    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
828    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
829    #[rstest]
830    #[case(Some(1), Some(0))] // Explicit precisions
831    #[case(None, None)] // Inferred precisions
832    pub fn test_read_deltas(
833        #[case] price_precision: Option<u8>,
834        #[case] size_precision: Option<u8>,
835    ) {
836        let filepath = ensure_data_exists_tardis_deribit_book_l2();
837        let deltas = load_deltas(
838            filepath,
839            price_precision,
840            size_precision,
841            None,
842            Some(10_000),
843        )
844        .unwrap();
845
846        assert_eq!(deltas.len(), 10_000);
847        assert_eq!(
848            deltas[0].instrument_id,
849            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
850        );
851        assert_eq!(deltas[0].action, BookAction::Add);
852        assert_eq!(deltas[0].order.side, OrderSide::Sell);
853        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
854        assert_eq!(deltas[0].order.size, Quantity::from("18640"));
855        assert_eq!(deltas[0].flags, 0);
856        assert_eq!(deltas[0].sequence, 0);
857        assert_eq!(deltas[0].ts_event, 1585699200245000000);
858        assert_eq!(deltas[0].ts_init, 1585699200355684000);
859    }
860
861    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
862    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
863    #[rstest]
864    #[case(Some(2), Some(3))] // Explicit precisions
865    #[case(None, None)] // Inferred precisions
866    pub fn test_read_depth10s_from_snapshot5(
867        #[case] price_precision: Option<u8>,
868        #[case] size_precision: Option<u8>,
869    ) {
870        let filepath = ensure_data_exists_tardis_binance_snapshot5();
871        let depths = load_depth10_from_snapshot5(
872            filepath,
873            price_precision,
874            size_precision,
875            None,
876            Some(10_000),
877        )
878        .unwrap();
879
880        assert_eq!(depths.len(), 10_000);
881        assert_eq!(
882            depths[0].instrument_id,
883            InstrumentId::from("BTCUSDT.BINANCE")
884        );
885        assert_eq!(depths[0].bids.len(), 10);
886        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
887        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
888        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
889        assert_eq!(depths[0].bids[0].order_id, 0);
890        assert_eq!(depths[0].asks.len(), 10);
891        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
892        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
893        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
894        assert_eq!(depths[0].asks[0].order_id, 0);
895        assert_eq!(depths[0].bid_counts[0], 1);
896        assert_eq!(depths[0].ask_counts[0], 1);
897        assert_eq!(depths[0].flags, 128);
898        assert_eq!(depths[0].ts_event, 1598918403696000000);
899        assert_eq!(depths[0].ts_init, 1598918403810979000);
900        assert_eq!(depths[0].sequence, 0);
901    }
902
903    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
904    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
905    #[rstest]
906    #[case(Some(2), Some(3))] // Explicit precisions
907    #[case(None, None)] // Inferred precisions
908    pub fn test_read_depth10s_from_snapshot25(
909        #[case] price_precision: Option<u8>,
910        #[case] size_precision: Option<u8>,
911    ) {
912        let filepath = ensure_data_exists_tardis_binance_snapshot25();
913        let depths = load_depth10_from_snapshot25(
914            filepath,
915            price_precision,
916            size_precision,
917            None,
918            Some(10_000),
919        )
920        .unwrap();
921
922        assert_eq!(depths.len(), 10_000);
923        assert_eq!(
924            depths[0].instrument_id,
925            InstrumentId::from("BTCUSDT.BINANCE")
926        );
927        assert_eq!(depths[0].bids.len(), 10);
928        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
929        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
930        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
931        assert_eq!(depths[0].bids[0].order_id, 0);
932        assert_eq!(depths[0].asks.len(), 10);
933        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
934        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
935        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
936        assert_eq!(depths[0].asks[0].order_id, 0);
937        assert_eq!(depths[0].bid_counts[0], 1);
938        assert_eq!(depths[0].ask_counts[0], 1);
939        assert_eq!(depths[0].flags, 128);
940        assert_eq!(depths[0].ts_event, 1598918403696000000);
941        assert_eq!(depths[0].ts_init, 1598918403810979000);
942        assert_eq!(depths[0].sequence, 0);
943    }
944
945    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
946    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
947    #[rstest]
948    #[case(Some(1), Some(0))] // Explicit precisions
949    #[case(None, None)] // Inferred precisions
950    pub fn test_read_quotes(
951        #[case] price_precision: Option<u8>,
952        #[case] size_precision: Option<u8>,
953    ) {
954        let filepath = ensure_data_exists_tardis_huobi_quotes();
955        let quotes = load_quote_ticks(
956            filepath,
957            price_precision,
958            size_precision,
959            None,
960            Some(10_000),
961        )
962        .unwrap();
963
964        assert_eq!(quotes.len(), 10_000);
965        assert_eq!(
966            quotes[0].instrument_id,
967            InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
968        );
969        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
970        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
971        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
972        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
973        assert_eq!(quotes[0].ts_event, 1588291201099000000);
974        assert_eq!(quotes[0].ts_init, 1588291201234268000);
975    }
976
977    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
978    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
979    #[rstest]
980    #[case(Some(1), Some(0))] // Explicit precisions
981    #[case(None, None)] // Inferred precisions
982    pub fn test_read_trades(
983        #[case] price_precision: Option<u8>,
984        #[case] size_precision: Option<u8>,
985    ) {
986        let filepath = ensure_data_exists_tardis_bitmex_trades();
987        let trades = load_trade_ticks(
988            filepath,
989            price_precision,
990            size_precision,
991            None,
992            Some(10_000),
993        )
994        .unwrap();
995
996        assert_eq!(trades.len(), 10_000);
997        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
998        assert_eq!(trades[0].price, Price::from("8531.5"));
999        assert_eq!(trades[0].size, Quantity::from("2152"));
1000        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
1001        assert_eq!(
1002            trades[0].trade_id,
1003            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1004        );
1005        assert_eq!(trades[0].ts_event, 1583020803145000000);
1006        assert_eq!(trades[0].ts_init, 1583020803307160000);
1007    }
1008}