nautilus_tardis/csv/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16mod record;
17
18use std::{
19    error::Error,
20    ffi::OsStr,
21    fs::File,
22    io::{BufReader, Read, Seek, SeekFrom},
23    path::Path,
24    time::Duration,
25};
26
27use csv::{Reader, ReaderBuilder, StringRecord};
28use flate2::read::GzDecoder;
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31    data::{
32        BookOrder, DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
33    },
34    enums::{BookAction, OrderSide, RecordFlag},
35    identifiers::{InstrumentId, TradeId},
36    types::{Quantity, fixed::FIXED_PRECISION},
37};
38
39use super::{
40    csv::record::{
41        TardisBookUpdateRecord, TardisOrderBookSnapshot5Record, TardisOrderBookSnapshot25Record,
42        TardisQuoteRecord, TardisTradeRecord,
43    },
44    parse::{
45        parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
46        parse_timestamp,
47    },
48};
49use crate::parse::parse_price;
50
51fn infer_precision(value: f64) -> u8 {
52    let str_value = value.to_string(); // Single allocation
53    match str_value.find('.') {
54        Some(decimal_idx) => (str_value.len() - decimal_idx - 1) as u8,
55        None => 0,
56    }
57}
58
59fn create_csv_reader<P: AsRef<Path>>(
60    filepath: P,
61) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
62    let filepath_ref = filepath.as_ref();
63    const MAX_RETRIES: u8 = 3;
64    const DELAY_MS: u64 = 100;
65
66    fn open_file_with_retry<P: AsRef<Path>>(
67        path: P,
68        max_retries: u8,
69        delay_ms: u64,
70    ) -> anyhow::Result<File> {
71        let path_ref = path.as_ref();
72        for attempt in 1..=max_retries {
73            match File::open(path_ref) {
74                Ok(file) => return Ok(file),
75                Err(e) => {
76                    if attempt == max_retries {
77                        anyhow::bail!(
78                            "Failed to open file '{}' after {max_retries} attempts: {e}",
79                            path_ref.display(),
80                        );
81                    }
82                    eprintln!(
83                        "Attempt {attempt}/{max_retries} failed to open file '{}': {e}. Retrying after {delay_ms}ms...",
84                        path_ref.display(),
85                    );
86                    std::thread::sleep(Duration::from_millis(delay_ms));
87                }
88            }
89        }
90        unreachable!("Loop should return either Ok or Err");
91    }
92
93    let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
94
95    let is_gzipped = filepath_ref
96        .extension()
97        .and_then(OsStr::to_str)
98        .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
99
100    if !is_gzipped {
101        let buf_reader = BufReader::new(file);
102        return Ok(ReaderBuilder::new()
103            .has_headers(true)
104            .from_reader(Box::new(buf_reader)));
105    }
106
107    let file_size = file.metadata()?.len();
108    if file_size < 2 {
109        anyhow::bail!("File too small to be a valid gzip file");
110    }
111
112    let mut header_buf = [0u8; 2];
113    for attempt in 1..=MAX_RETRIES {
114        match file.read_exact(&mut header_buf) {
115            Ok(()) => break,
116            Err(e) => {
117                if attempt == MAX_RETRIES {
118                    anyhow::bail!(
119                        "Failed to read gzip header from '{}' after {MAX_RETRIES} attempts: {e}",
120                        filepath_ref.display(),
121                    );
122                }
123                eprintln!(
124                    "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{}': {e}. Retrying after {DELAY_MS}ms...",
125                    filepath_ref.display(),
126                );
127                std::thread::sleep(Duration::from_millis(DELAY_MS));
128            }
129        }
130    }
131
132    if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
133        anyhow::bail!(
134            "File '{}' has .gz extension but invalid gzip header",
135            filepath_ref.display(),
136        );
137    }
138
139    for attempt in 1..=MAX_RETRIES {
140        match file.seek(SeekFrom::Start(0)) {
141            Ok(_) => break,
142            Err(e) => {
143                if attempt == MAX_RETRIES {
144                    anyhow::bail!(
145                        "Failed to reset file position for '{}' after {MAX_RETRIES} attempts: {e}",
146                        filepath_ref.display(),
147                    );
148                }
149                eprintln!(
150                    "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{}': {e}. Retrying after {DELAY_MS}ms...",
151                    filepath_ref.display(),
152                );
153                std::thread::sleep(Duration::from_millis(DELAY_MS));
154            }
155        }
156    }
157
158    let buf_reader = BufReader::new(file);
159    let decoder = GzDecoder::new(buf_reader);
160
161    Ok(ReaderBuilder::new()
162        .has_headers(true)
163        .from_reader(Box::new(decoder)))
164}
165
166/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
167/// automatically applying `GZip` decompression for files ending in ".gz".
168pub fn load_deltas<P: AsRef<Path>>(
169    filepath: P,
170    price_precision: Option<u8>,
171    size_precision: Option<u8>,
172    instrument_id: Option<InstrumentId>,
173    limit: Option<usize>,
174) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
175    // Infer precisions if not provided
176    let (price_precision, size_precision) = match (price_precision, size_precision) {
177        (Some(p), Some(s)) => (p, s),
178        (price_precision, size_precision) => {
179            let mut reader = create_csv_reader(&filepath)?;
180            let mut record = StringRecord::new();
181
182            let mut max_price_precision = 0u8;
183            let mut max_size_precision = 0u8;
184            let mut count = 0;
185
186            while reader.read_record(&mut record)? {
187                let parsed: TardisBookUpdateRecord = record.deserialize(None)?;
188
189                if price_precision.is_none() {
190                    max_price_precision = infer_precision(parsed.price).max(max_price_precision);
191                }
192
193                if size_precision.is_none() {
194                    max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
195                }
196
197                if let Some(limit) = limit {
198                    if count >= limit {
199                        break;
200                    }
201                    count += 1;
202                }
203            }
204
205            drop(reader);
206
207            max_price_precision = max_price_precision.min(FIXED_PRECISION);
208            max_size_precision = max_size_precision.min(FIXED_PRECISION);
209
210            (
211                price_precision.unwrap_or(max_price_precision),
212                size_precision.unwrap_or(max_size_precision),
213            )
214        }
215    };
216
217    let mut deltas: Vec<OrderBookDelta> = Vec::new();
218    let mut last_ts_event = UnixNanos::default();
219
220    let mut reader = create_csv_reader(filepath)?;
221    let mut record = StringRecord::new();
222
223    while reader.read_record(&mut record)? {
224        let record: TardisBookUpdateRecord = record.deserialize(None)?;
225
226        let instrument_id = match &instrument_id {
227            Some(id) => *id,
228            None => parse_instrument_id(&record.exchange, record.symbol),
229        };
230        let side = parse_order_side(&record.side);
231        let price = parse_price(record.price, price_precision);
232        let size = Quantity::new(record.amount, size_precision);
233        let order_id = 0; // Not applicable for L2 data
234        let order = BookOrder::new(side, price, size, order_id);
235
236        let action = parse_book_action(record.is_snapshot, size.as_f64());
237        let flags = 0; // Flags always zero until timestamp changes
238        let sequence = 0; // Sequence not available
239        let ts_event = parse_timestamp(record.timestamp);
240        let ts_init = parse_timestamp(record.local_timestamp);
241
242        // Check if timestamp is different from last timestamp
243        if last_ts_event != ts_event {
244            if let Some(last_delta) = deltas.last_mut() {
245                // Set previous delta flags as F_LAST
246                last_delta.flags = RecordFlag::F_LAST.value();
247            }
248        }
249
250        assert!(
251            !(action != BookAction::Delete && size.is_zero()),
252            "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {record:?}"
253        );
254
255        last_ts_event = ts_event;
256
257        let delta = OrderBookDelta::new(
258            instrument_id,
259            action,
260            order,
261            flags,
262            sequence,
263            ts_event,
264            ts_init,
265        );
266
267        deltas.push(delta);
268
269        if let Some(limit) = limit {
270            if deltas.len() >= limit {
271                break;
272            }
273        }
274    }
275
276    // Set F_LAST flag for final delta
277    if let Some(last_delta) = deltas.last_mut() {
278        last_delta.flags = RecordFlag::F_LAST.value();
279    }
280
281    Ok(deltas)
282}
283
284fn create_book_order(
285    side: OrderSide,
286    price: Option<f64>,
287    amount: Option<f64>,
288    price_precision: u8,
289    size_precision: u8,
290) -> (BookOrder, u32) {
291    match price {
292        Some(price) => (
293            BookOrder::new(
294                side,
295                parse_price(price, price_precision),
296                Quantity::new(amount.unwrap_or(0.0), size_precision),
297                0,
298            ),
299            1, // Count set to 1 if order exists
300        ),
301        None => (NULL_ORDER, 0), // NULL_ORDER if price is None
302    }
303}
304
305/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
306/// automatically applying `GZip` decompression for files ending in ".gz".
307pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
308    filepath: P,
309    price_precision: Option<u8>,
310    size_precision: Option<u8>,
311    instrument_id: Option<InstrumentId>,
312    limit: Option<usize>,
313) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
314    // Infer precisions if not provided
315    let (price_precision, size_precision) = match (price_precision, size_precision) {
316        (Some(p), Some(s)) => (p, s),
317        (price_precision, size_precision) => {
318            let mut reader = create_csv_reader(&filepath)?;
319            let mut record = StringRecord::new();
320
321            let mut max_price_precision = 0u8;
322            let mut max_size_precision = 0u8;
323            let mut count = 0;
324
325            while reader.read_record(&mut record)? {
326                let parsed: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
327
328                if price_precision.is_none() {
329                    if let Some(bid_price) = parsed.bids_0_price {
330                        max_price_precision = infer_precision(bid_price).max(max_price_precision);
331                    }
332                }
333
334                if size_precision.is_none() {
335                    if let Some(bid_amount) = parsed.bids_0_amount {
336                        max_size_precision = infer_precision(bid_amount).max(max_size_precision);
337                    }
338                }
339
340                if let Some(limit) = limit {
341                    if count >= limit {
342                        break;
343                    }
344                    count += 1;
345                }
346            }
347
348            drop(reader);
349
350            max_price_precision = max_price_precision.min(FIXED_PRECISION);
351            max_size_precision = max_size_precision.min(FIXED_PRECISION);
352
353            (
354                price_precision.unwrap_or(max_price_precision),
355                size_precision.unwrap_or(max_size_precision),
356            )
357        }
358    };
359
360    let mut depths: Vec<OrderBookDepth10> = Vec::new();
361
362    let mut reader = create_csv_reader(filepath)?;
363    let mut record = StringRecord::new();
364    while reader.read_record(&mut record)? {
365        let record: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
366        let instrument_id = match &instrument_id {
367            Some(id) => *id,
368            None => parse_instrument_id(&record.exchange, record.symbol),
369        };
370        let flags = RecordFlag::F_LAST.value();
371        let sequence = 0; // Sequence not available
372        let ts_event = parse_timestamp(record.timestamp);
373        let ts_init = parse_timestamp(record.local_timestamp);
374
375        // Initialize empty arrays
376        let mut bids = [NULL_ORDER; DEPTH10_LEN];
377        let mut asks = [NULL_ORDER; DEPTH10_LEN];
378        let mut bid_counts = [0u32; DEPTH10_LEN];
379        let mut ask_counts = [0u32; DEPTH10_LEN];
380
381        for i in 0..=4 {
382            // Create bids
383            let (bid_order, bid_count) = create_book_order(
384                OrderSide::Buy,
385                match i {
386                    0 => record.bids_0_price,
387                    1 => record.bids_1_price,
388                    2 => record.bids_2_price,
389                    3 => record.bids_3_price,
390                    4 => record.bids_4_price,
391                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
392                },
393                match i {
394                    0 => record.bids_0_amount,
395                    1 => record.bids_1_amount,
396                    2 => record.bids_2_amount,
397                    3 => record.bids_3_amount,
398                    4 => record.bids_4_amount,
399                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
400                },
401                price_precision,
402                size_precision,
403            );
404            bids[i] = bid_order;
405            bid_counts[i] = bid_count;
406
407            // Create asks
408            let (ask_order, ask_count) = create_book_order(
409                OrderSide::Sell,
410                match i {
411                    0 => record.asks_0_price,
412                    1 => record.asks_1_price,
413                    2 => record.asks_2_price,
414                    3 => record.asks_3_price,
415                    4 => record.asks_4_price,
416                    _ => None, // Unreachable, but for safety
417                },
418                match i {
419                    0 => record.asks_0_amount,
420                    1 => record.asks_1_amount,
421                    2 => record.asks_2_amount,
422                    3 => record.asks_3_amount,
423                    4 => record.asks_4_amount,
424                    _ => None, // Unreachable, but for safety
425                },
426                price_precision,
427                size_precision,
428            );
429            asks[i] = ask_order;
430            ask_counts[i] = ask_count;
431        }
432
433        let depth = OrderBookDepth10::new(
434            instrument_id,
435            bids,
436            asks,
437            bid_counts,
438            ask_counts,
439            flags,
440            sequence,
441            ts_event,
442            ts_init,
443        );
444
445        depths.push(depth);
446
447        if let Some(limit) = limit {
448            if depths.len() >= limit {
449                break;
450            }
451        }
452    }
453
454    Ok(depths)
455}
456
457/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
458/// automatically applying `GZip` decompression for files ending in ".gz".
459pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
460    filepath: P,
461    price_precision: Option<u8>,
462    size_precision: Option<u8>,
463    instrument_id: Option<InstrumentId>,
464    limit: Option<usize>,
465) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
466    // Infer precisions if not provided
467    let (price_precision, size_precision) = match (price_precision, size_precision) {
468        (Some(p), Some(s)) => (p, s),
469        (price_precision, size_precision) => {
470            let mut reader = create_csv_reader(&filepath)?;
471            let mut record = StringRecord::new();
472
473            let mut max_price_precision = 0u8;
474            let mut max_size_precision = 0u8;
475            let mut count = 0;
476
477            while reader.read_record(&mut record)? {
478                let parsed: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
479
480                if price_precision.is_none() {
481                    if let Some(bid_price) = parsed.bids_0_price {
482                        max_price_precision = infer_precision(bid_price).max(max_price_precision);
483                    }
484                }
485
486                if size_precision.is_none() {
487                    if let Some(bid_amount) = parsed.bids_0_amount {
488                        max_size_precision = infer_precision(bid_amount).max(max_size_precision);
489                    }
490                }
491
492                if let Some(limit) = limit {
493                    if count >= limit {
494                        break;
495                    }
496                    count += 1;
497                }
498            }
499
500            drop(reader);
501
502            max_price_precision = max_price_precision.min(FIXED_PRECISION);
503            max_size_precision = max_size_precision.min(FIXED_PRECISION);
504
505            (
506                price_precision.unwrap_or(max_price_precision),
507                size_precision.unwrap_or(max_size_precision),
508            )
509        }
510    };
511
512    let mut depths: Vec<OrderBookDepth10> = Vec::new();
513    let mut reader = create_csv_reader(filepath)?;
514    let mut record = StringRecord::new();
515
516    while reader.read_record(&mut record)? {
517        let record: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
518
519        let instrument_id = match &instrument_id {
520            Some(id) => *id,
521            None => parse_instrument_id(&record.exchange, record.symbol),
522        };
523        let flags = RecordFlag::F_LAST.value();
524        let sequence = 0; // Sequence not available
525        let ts_event = parse_timestamp(record.timestamp);
526        let ts_init = parse_timestamp(record.local_timestamp);
527
528        // Initialize empty arrays for the first 10 levels only
529        let mut bids = [NULL_ORDER; DEPTH10_LEN];
530        let mut asks = [NULL_ORDER; DEPTH10_LEN];
531        let mut bid_counts = [0u32; DEPTH10_LEN];
532        let mut ask_counts = [0u32; DEPTH10_LEN];
533
534        // Fill only the first 10 levels from the 25-level record
535        for i in 0..DEPTH10_LEN {
536            // Create bids
537            let (bid_order, bid_count) = create_book_order(
538                OrderSide::Buy,
539                match i {
540                    0 => record.bids_0_price,
541                    1 => record.bids_1_price,
542                    2 => record.bids_2_price,
543                    3 => record.bids_3_price,
544                    4 => record.bids_4_price,
545                    5 => record.bids_5_price,
546                    6 => record.bids_6_price,
547                    7 => record.bids_7_price,
548                    8 => record.bids_8_price,
549                    9 => record.bids_9_price,
550                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
551                },
552                match i {
553                    0 => record.bids_0_amount,
554                    1 => record.bids_1_amount,
555                    2 => record.bids_2_amount,
556                    3 => record.bids_3_amount,
557                    4 => record.bids_4_amount,
558                    5 => record.bids_5_amount,
559                    6 => record.bids_6_amount,
560                    7 => record.bids_7_amount,
561                    8 => record.bids_8_amount,
562                    9 => record.bids_9_amount,
563                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
564                },
565                price_precision,
566                size_precision,
567            );
568            bids[i] = bid_order;
569            bid_counts[i] = bid_count;
570
571            // Create asks
572            let (ask_order, ask_count) = create_book_order(
573                OrderSide::Sell,
574                match i {
575                    0 => record.asks_0_price,
576                    1 => record.asks_1_price,
577                    2 => record.asks_2_price,
578                    3 => record.asks_3_price,
579                    4 => record.asks_4_price,
580                    5 => record.asks_5_price,
581                    6 => record.asks_6_price,
582                    7 => record.asks_7_price,
583                    8 => record.asks_8_price,
584                    9 => record.asks_9_price,
585                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
586                },
587                match i {
588                    0 => record.asks_0_amount,
589                    1 => record.asks_1_amount,
590                    2 => record.asks_2_amount,
591                    3 => record.asks_3_amount,
592                    4 => record.asks_4_amount,
593                    5 => record.asks_5_amount,
594                    6 => record.asks_6_amount,
595                    7 => record.asks_7_amount,
596                    8 => record.asks_8_amount,
597                    9 => record.asks_9_amount,
598                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
599                },
600                price_precision,
601                size_precision,
602            );
603            asks[i] = ask_order;
604            ask_counts[i] = ask_count;
605        }
606
607        let depth = OrderBookDepth10::new(
608            instrument_id,
609            bids,
610            asks,
611            bid_counts,
612            ask_counts,
613            flags,
614            sequence,
615            ts_event,
616            ts_init,
617        );
618
619        depths.push(depth);
620
621        if let Some(limit) = limit {
622            if depths.len() >= limit {
623                break;
624            }
625        }
626    }
627
628    Ok(depths)
629}
630
631/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
632/// automatically applying `GZip` decompression for files ending in ".gz".
633pub fn load_quote_ticks<P: AsRef<Path>>(
634    filepath: P,
635    price_precision: Option<u8>,
636    size_precision: Option<u8>,
637    instrument_id: Option<InstrumentId>,
638    limit: Option<usize>,
639) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
640    // Infer precisions if not provided
641    let (price_precision, size_precision) = match (price_precision, size_precision) {
642        (Some(p), Some(s)) => (p, s),
643        (price_precision, size_precision) => {
644            let mut reader = create_csv_reader(&filepath)?;
645            let mut record = StringRecord::new();
646
647            let mut max_price_precision = 0u8;
648            let mut max_size_precision = 0u8;
649            let mut count = 0;
650
651            while reader.read_record(&mut record)? {
652                let parsed: TardisQuoteRecord = record.deserialize(None)?;
653
654                if price_precision.is_none() {
655                    if let Some(bid_price) = parsed.bid_price {
656                        max_price_precision = infer_precision(bid_price).max(max_price_precision);
657                    }
658                }
659
660                if size_precision.is_none() {
661                    if let Some(bid_amount) = parsed.bid_amount {
662                        max_size_precision = infer_precision(bid_amount).max(max_size_precision);
663                    }
664                }
665
666                if let Some(limit) = limit {
667                    if count >= limit {
668                        break;
669                    }
670                    count += 1;
671                }
672            }
673
674            drop(reader);
675
676            max_price_precision = max_price_precision.min(FIXED_PRECISION);
677            max_size_precision = max_size_precision.min(FIXED_PRECISION);
678
679            (
680                price_precision.unwrap_or(max_price_precision),
681                size_precision.unwrap_or(max_size_precision),
682            )
683        }
684    };
685
686    let mut quotes = Vec::new();
687    let mut reader = create_csv_reader(filepath)?;
688    let mut record = StringRecord::new();
689
690    while reader.read_record(&mut record)? {
691        let record: TardisQuoteRecord = record.deserialize(None)?;
692
693        let instrument_id = match &instrument_id {
694            Some(id) => *id,
695            None => parse_instrument_id(&record.exchange, record.symbol),
696        };
697        let bid_price = parse_price(record.bid_price.unwrap_or(0.0), price_precision);
698        let bid_size = Quantity::new(record.bid_amount.unwrap_or(0.0), size_precision);
699        let ask_price = parse_price(record.ask_price.unwrap_or(0.0), price_precision);
700        let ask_size = Quantity::new(record.ask_amount.unwrap_or(0.0), size_precision);
701        let ts_event = parse_timestamp(record.timestamp);
702        let ts_init = parse_timestamp(record.local_timestamp);
703
704        let quote = QuoteTick::new(
705            instrument_id,
706            bid_price,
707            ask_price,
708            bid_size,
709            ask_size,
710            ts_event,
711            ts_init,
712        );
713
714        quotes.push(quote);
715
716        if let Some(limit) = limit {
717            if quotes.len() >= limit {
718                break;
719            }
720        }
721    }
722
723    Ok(quotes)
724}
725
726/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
727/// automatically applying `GZip` decompression for files ending in ".gz".
728pub fn load_trade_ticks<P: AsRef<Path>>(
729    filepath: P,
730    price_precision: Option<u8>,
731    size_precision: Option<u8>,
732    instrument_id: Option<InstrumentId>,
733    limit: Option<usize>,
734) -> Result<Vec<TradeTick>, Box<dyn Error>> {
735    // Infer precisions if not provided
736    let (price_precision, size_precision) = match (price_precision, size_precision) {
737        (Some(p), Some(s)) => (p, s),
738        (price_precision, size_precision) => {
739            let mut reader = create_csv_reader(&filepath)?;
740            let mut record = StringRecord::new();
741
742            let mut max_price_precision = 0u8;
743            let mut max_size_precision = 0u8;
744            let mut count = 0;
745
746            while reader.read_record(&mut record)? {
747                let parsed: TardisTradeRecord = record.deserialize(None)?;
748
749                if price_precision.is_none() {
750                    max_price_precision = infer_precision(parsed.price).max(max_price_precision);
751                }
752
753                if size_precision.is_none() {
754                    max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
755                }
756
757                if let Some(limit) = limit {
758                    if count >= limit {
759                        break;
760                    }
761                    count += 1;
762                }
763            }
764
765            drop(reader);
766
767            max_price_precision = max_price_precision.min(FIXED_PRECISION);
768            max_size_precision = max_size_precision.min(FIXED_PRECISION);
769
770            (
771                price_precision.unwrap_or(max_price_precision),
772                size_precision.unwrap_or(max_size_precision),
773            )
774        }
775    };
776
777    let mut trades = Vec::new();
778    let mut reader = create_csv_reader(filepath)?;
779    let mut record = StringRecord::new();
780
781    while reader.read_record(&mut record)? {
782        let record: TardisTradeRecord = record.deserialize(None)?;
783
784        let instrument_id = match &instrument_id {
785            Some(id) => *id,
786            None => parse_instrument_id(&record.exchange, record.symbol),
787        };
788        let price = parse_price(record.price, price_precision);
789        let size = Quantity::new(record.amount, size_precision);
790        let aggressor_side = parse_aggressor_side(&record.side);
791        let trade_id = TradeId::new(&record.id);
792        let ts_event = parse_timestamp(record.timestamp);
793        let ts_init = parse_timestamp(record.local_timestamp);
794
795        let trade = TradeTick::new(
796            instrument_id,
797            price,
798            size,
799            aggressor_side,
800            trade_id,
801            ts_event,
802            ts_init,
803        );
804
805        trades.push(trade);
806
807        if let Some(limit) = limit {
808            if trades.len() >= limit {
809                break;
810            }
811        }
812    }
813
814    Ok(trades)
815}
816
817////////////////////////////////////////////////////////////////////////////////
818// Tests
819////////////////////////////////////////////////////////////////////////////////
820#[cfg(test)]
821mod tests {
822    use nautilus_model::{
823        enums::{AggressorSide, BookAction},
824        identifiers::InstrumentId,
825        types::Price,
826    };
827    use nautilus_test_kit::common::{
828        ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
829        ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
830        ensure_data_exists_tardis_huobi_quotes,
831    };
832    use rstest::*;
833
834    use super::*;
835
836    #[rstest]
837    #[case(Some(1), Some(0))] // Explicit precisions
838    #[case(None, None)] // Inferred precisions
839    pub fn test_read_deltas(
840        #[case] price_precision: Option<u8>,
841        #[case] size_precision: Option<u8>,
842    ) {
843        let filepath = ensure_data_exists_tardis_deribit_book_l2();
844        let deltas = load_deltas(
845            filepath,
846            price_precision,
847            size_precision,
848            None,
849            Some(10_000),
850        )
851        .unwrap();
852
853        assert_eq!(deltas.len(), 10_000);
854        assert_eq!(
855            deltas[0].instrument_id,
856            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
857        );
858        assert_eq!(deltas[0].action, BookAction::Add);
859        assert_eq!(deltas[0].order.side, OrderSide::Sell);
860        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
861        assert_eq!(deltas[0].order.size, Quantity::from("18640"));
862        assert_eq!(deltas[0].flags, 0);
863        assert_eq!(deltas[0].sequence, 0);
864        assert_eq!(deltas[0].ts_event, 1585699200245000000);
865        assert_eq!(deltas[0].ts_init, 1585699200355684000);
866    }
867
868    #[rstest]
869    #[case(Some(2), Some(3))] // Explicit precisions
870    #[case(None, None)] // Inferred precisions
871    pub fn test_read_depth10s_from_snapshot5(
872        #[case] price_precision: Option<u8>,
873        #[case] size_precision: Option<u8>,
874    ) {
875        let filepath = ensure_data_exists_tardis_binance_snapshot5();
876        let depths = load_depth10_from_snapshot5(
877            filepath,
878            price_precision,
879            size_precision,
880            None,
881            Some(10_000),
882        )
883        .unwrap();
884
885        assert_eq!(depths.len(), 10_000);
886        assert_eq!(
887            depths[0].instrument_id,
888            InstrumentId::from("BTCUSDT.BINANCE")
889        );
890        assert_eq!(depths[0].bids.len(), 10);
891        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
892        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
893        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
894        assert_eq!(depths[0].bids[0].order_id, 0);
895        assert_eq!(depths[0].asks.len(), 10);
896        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
897        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
898        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
899        assert_eq!(depths[0].asks[0].order_id, 0);
900        assert_eq!(depths[0].bid_counts[0], 1);
901        assert_eq!(depths[0].ask_counts[0], 1);
902        assert_eq!(depths[0].flags, 128);
903        assert_eq!(depths[0].ts_event, 1598918403696000000);
904        assert_eq!(depths[0].ts_init, 1598918403810979000);
905        assert_eq!(depths[0].sequence, 0);
906    }
907
908    #[rstest]
909    #[case(Some(2), Some(3))] // Explicit precisions
910    #[case(None, None)] // Inferred precisions
911    pub fn test_read_depth10s_from_snapshot25(
912        #[case] price_precision: Option<u8>,
913        #[case] size_precision: Option<u8>,
914    ) {
915        let filepath = ensure_data_exists_tardis_binance_snapshot25();
916        let depths = load_depth10_from_snapshot25(
917            filepath,
918            price_precision,
919            size_precision,
920            None,
921            Some(10_000),
922        )
923        .unwrap();
924
925        assert_eq!(depths.len(), 10_000);
926        assert_eq!(
927            depths[0].instrument_id,
928            InstrumentId::from("BTCUSDT.BINANCE")
929        );
930        assert_eq!(depths[0].bids.len(), 10);
931        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
932        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
933        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
934        assert_eq!(depths[0].bids[0].order_id, 0);
935        assert_eq!(depths[0].asks.len(), 10);
936        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
937        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
938        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
939        assert_eq!(depths[0].asks[0].order_id, 0);
940        assert_eq!(depths[0].bid_counts[0], 1);
941        assert_eq!(depths[0].ask_counts[0], 1);
942        assert_eq!(depths[0].flags, 128);
943        assert_eq!(depths[0].ts_event, 1598918403696000000);
944        assert_eq!(depths[0].ts_init, 1598918403810979000);
945        assert_eq!(depths[0].sequence, 0);
946    }
947
948    #[rstest]
949    #[case(Some(1), Some(0))] // Explicit precisions
950    #[case(None, None)] // Inferred precisions
951    pub fn test_read_quotes(
952        #[case] price_precision: Option<u8>,
953        #[case] size_precision: Option<u8>,
954    ) {
955        let filepath = ensure_data_exists_tardis_huobi_quotes();
956        let quotes = load_quote_ticks(
957            filepath,
958            price_precision,
959            size_precision,
960            None,
961            Some(10_000),
962        )
963        .unwrap();
964
965        assert_eq!(quotes.len(), 10_000);
966        assert_eq!(quotes[0].instrument_id, InstrumentId::from("BTC-USD.HUOBI"));
967        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
968        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
969        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
970        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
971        assert_eq!(quotes[0].ts_event, 1588291201099000000);
972        assert_eq!(quotes[0].ts_init, 1588291201234268000);
973    }
974
975    #[rstest]
976    #[case(Some(1), Some(0))] // Explicit precisions
977    #[case(None, None)] // Inferred precisions
978    pub fn test_read_trades(
979        #[case] price_precision: Option<u8>,
980        #[case] size_precision: Option<u8>,
981    ) {
982        let filepath = ensure_data_exists_tardis_bitmex_trades();
983        let trades = load_trade_ticks(
984            filepath,
985            price_precision,
986            size_precision,
987            None,
988            Some(10_000),
989        )
990        .unwrap();
991
992        assert_eq!(trades.len(), 10_000);
993        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
994        assert_eq!(trades[0].price, Price::from("8531.5"));
995        assert_eq!(trades[0].size, Quantity::from("2152"));
996        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
997        assert_eq!(
998            trades[0].trade_id,
999            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1000        );
1001        assert_eq!(trades[0].ts_event, 1583020803145000000);
1002        assert_eq!(trades[0].ts_init, 1583020803307160000);
1003    }
1004}