nautilus_tardis/csv/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16mod record;
17
18use std::{
19    error::Error,
20    ffi::OsStr,
21    fs::File,
22    io::{BufReader, Read, Seek, SeekFrom},
23    path::Path,
24    time::Duration,
25};
26
27use csv::{Reader, ReaderBuilder, StringRecord};
28use flate2::read::GzDecoder;
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31    data::{
32        BookOrder, DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
33    },
34    enums::{BookAction, OrderSide, RecordFlag},
35    identifiers::{InstrumentId, TradeId},
36    types::{Quantity, fixed::FIXED_PRECISION},
37};
38
39use super::{
40    csv::record::{
41        TardisBookUpdateRecord, TardisOrderBookSnapshot5Record, TardisOrderBookSnapshot25Record,
42        TardisQuoteRecord, TardisTradeRecord,
43    },
44    parse::{
45        parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
46        parse_timestamp,
47    },
48};
49use crate::parse::parse_price;
50
51fn infer_precision(value: f64) -> u8 {
52    let mut buf = ryu::Buffer::new(); // Stack allocation
53    let s = buf.format(value);
54
55    match s.rsplit_once('.') {
56        Some((_, frac)) if frac != "0" => frac.len() as u8,
57        _ => 0,
58    }
59}
60
61fn create_csv_reader<P: AsRef<Path>>(
62    filepath: P,
63) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
64    let filepath_ref = filepath.as_ref();
65    const MAX_RETRIES: u8 = 3;
66    const DELAY_MS: u64 = 100;
67    const BUFFER_SIZE: usize = 8 * 1024 * 1024; // 8MB buffer for large files
68
69    fn open_file_with_retry<P: AsRef<Path>>(
70        path: P,
71        max_retries: u8,
72        delay_ms: u64,
73    ) -> anyhow::Result<File> {
74        let path_ref = path.as_ref();
75        for attempt in 1..=max_retries {
76            match File::open(path_ref) {
77                Ok(file) => return Ok(file),
78                Err(e) => {
79                    if attempt == max_retries {
80                        anyhow::bail!(
81                            "Failed to open file '{path_ref:?}' after {max_retries} attempts: {e}"
82                        );
83                    }
84                    eprintln!(
85                        "Attempt {attempt}/{max_retries} failed to open file '{path_ref:?}': {e}. Retrying after {delay_ms}ms..."
86                    );
87                    std::thread::sleep(Duration::from_millis(delay_ms));
88                }
89            }
90        }
91        unreachable!("Loop should return either Ok or Err");
92    }
93
94    let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
95
96    let is_gzipped = filepath_ref
97        .extension()
98        .and_then(OsStr::to_str)
99        .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
100
101    if !is_gzipped {
102        let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
103        return Ok(ReaderBuilder::new()
104            .has_headers(true)
105            .buffer_capacity(1024 * 1024) // 1MB CSV buffer
106            .from_reader(Box::new(buf_reader)));
107    }
108
109    let file_size = file.metadata()?.len();
110    if file_size < 2 {
111        anyhow::bail!("File too small to be a valid gzip file");
112    }
113
114    let mut header_buf = [0u8; 2];
115    for attempt in 1..=MAX_RETRIES {
116        match file.read_exact(&mut header_buf) {
117            Ok(()) => break,
118            Err(e) => {
119                if attempt == MAX_RETRIES {
120                    anyhow::bail!(
121                        "Failed to read gzip header from '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
122                    );
123                }
124                eprintln!(
125                    "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
126                );
127                std::thread::sleep(Duration::from_millis(DELAY_MS));
128            }
129        }
130    }
131
132    if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
133        anyhow::bail!("File '{filepath_ref:?}' has .gz extension but invalid gzip header");
134    }
135
136    for attempt in 1..=MAX_RETRIES {
137        match file.seek(SeekFrom::Start(0)) {
138            Ok(_) => break,
139            Err(e) => {
140                if attempt == MAX_RETRIES {
141                    anyhow::bail!(
142                        "Failed to reset file position for '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
143                    );
144                }
145                eprintln!(
146                    "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
147                );
148                std::thread::sleep(Duration::from_millis(DELAY_MS));
149            }
150        }
151    }
152
153    let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
154    let decoder = GzDecoder::new(buf_reader);
155
156    Ok(ReaderBuilder::new()
157        .has_headers(true)
158        .buffer_capacity(1024 * 1024) // 1MB CSV buffer
159        .from_reader(Box::new(decoder)))
160}
161
162/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
163/// automatically applying `GZip` decompression for files ending in ".gz".
164/// Load order book delta records from a CSV or gzipped CSV file.
165///
166/// # Errors
167///
168/// Returns an error if the file cannot be opened, read, or parsed as CSV.
169/// # Panics
170///
171/// Panics if a CSV record has a zero size for a non-delete action or if data conversion fails.
172pub fn load_deltas<P: AsRef<Path>>(
173    filepath: P,
174    price_precision: Option<u8>,
175    size_precision: Option<u8>,
176    instrument_id: Option<InstrumentId>,
177    limit: Option<usize>,
178) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
179    // Estimate capacity for Vec pre-allocation
180    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
181    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
182
183    let mut current_price_precision = price_precision.unwrap_or(0);
184    let mut current_size_precision = size_precision.unwrap_or(0);
185    let mut last_ts_event = UnixNanos::default();
186
187    let mut reader = create_csv_reader(filepath)?;
188    let mut record = StringRecord::new();
189
190    while reader.read_record(&mut record)? {
191        let data: TardisBookUpdateRecord = record.deserialize(None)?;
192
193        // Update precisions dynamically if not explicitly set
194        let mut precision_updated = false;
195
196        if price_precision.is_none() {
197            let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
198            if inferred_price_precision > current_price_precision {
199                current_price_precision = inferred_price_precision;
200                precision_updated = true;
201            }
202        }
203
204        if size_precision.is_none() {
205            let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
206            if inferred_size_precision > current_size_precision {
207                current_size_precision = inferred_size_precision;
208                precision_updated = true;
209            }
210        }
211
212        // If precision increased, update all previous deltas
213        if precision_updated {
214            for delta in deltas.iter_mut() {
215                if price_precision.is_none() {
216                    delta.order.price.precision = current_price_precision;
217                }
218                if size_precision.is_none() {
219                    delta.order.size.precision = current_size_precision;
220                }
221            }
222        }
223
224        let instrument_id = match &instrument_id {
225            Some(id) => *id,
226            None => parse_instrument_id(&data.exchange, data.symbol),
227        };
228        let side = parse_order_side(&data.side);
229        let price = parse_price(data.price, current_price_precision);
230        let size = Quantity::new(data.amount, current_size_precision);
231        let order_id = 0; // Not applicable for L2 data
232        let order = BookOrder::new(side, price, size, order_id);
233
234        let action = parse_book_action(data.is_snapshot, size.as_f64());
235        let flags = 0; // Flags always zero until timestamp changes
236        let sequence = 0; // Sequence not available
237        let ts_event = parse_timestamp(data.timestamp);
238        let ts_init = parse_timestamp(data.local_timestamp);
239
240        // Check if timestamp is different from last timestamp
241        if last_ts_event != ts_event
242            && let Some(last_delta) = deltas.last_mut()
243        {
244            // Set previous delta flags as F_LAST
245            last_delta.flags = RecordFlag::F_LAST.value();
246        }
247
248        assert!(
249            !(action != BookAction::Delete && size.is_zero()),
250            "Invalid delta: action {action} when size zero, check size_precision ({current_size_precision}) vs data; {data:?}"
251        );
252
253        last_ts_event = ts_event;
254
255        let delta = OrderBookDelta::new(
256            instrument_id,
257            action,
258            order,
259            flags,
260            sequence,
261            ts_event,
262            ts_init,
263        );
264
265        deltas.push(delta);
266
267        if let Some(limit) = limit
268            && deltas.len() >= limit
269        {
270            break;
271        }
272    }
273
274    // Set F_LAST flag for final delta
275    if let Some(last_delta) = deltas.last_mut() {
276        last_delta.flags = RecordFlag::F_LAST.value();
277    }
278
279    Ok(deltas)
280}
281
282fn create_book_order(
283    side: OrderSide,
284    price: Option<f64>,
285    amount: Option<f64>,
286    price_precision: u8,
287    size_precision: u8,
288) -> (BookOrder, u32) {
289    match price {
290        Some(price) => (
291            BookOrder::new(
292                side,
293                parse_price(price, price_precision),
294                Quantity::new(amount.unwrap_or(0.0), size_precision),
295                0,
296            ),
297            1, // Count set to 1 if order exists
298        ),
299        None => (NULL_ORDER, 0), // NULL_ORDER if price is None
300    }
301}
302
303/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
304/// automatically applying `GZip` decompression for files ending in ".gz".
305/// Load order book depth-10 snapshots (5-level) from a CSV or gzipped CSV file.
306///
307/// # Errors
308///
309/// Returns an error if the file cannot be opened, read, or parsed as CSV.
310/// # Panics
311///
312/// Panics if a record level cannot be parsed to depth-10.
313pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
314    filepath: P,
315    price_precision: Option<u8>,
316    size_precision: Option<u8>,
317    instrument_id: Option<InstrumentId>,
318    limit: Option<usize>,
319) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
320    // Estimate capacity for Vec pre-allocation
321    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
322    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
323
324    let mut current_price_precision = price_precision.unwrap_or(0);
325    let mut current_size_precision = size_precision.unwrap_or(0);
326
327    let mut reader = create_csv_reader(filepath)?;
328    let mut record = StringRecord::new();
329
330    while reader.read_record(&mut record)? {
331        let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
332
333        // Update precisions dynamically if not explicitly set
334        let mut precision_updated = false;
335
336        if price_precision.is_none() {
337            if let Some(bid_price) = data.bids_0_price {
338                let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
339                if inferred_price_precision > current_price_precision {
340                    current_price_precision = inferred_price_precision;
341                    precision_updated = true;
342                }
343            }
344        }
345
346        if size_precision.is_none() {
347            if let Some(bid_amount) = data.bids_0_amount {
348                let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
349                if inferred_size_precision > current_size_precision {
350                    current_size_precision = inferred_size_precision;
351                    precision_updated = true;
352                }
353            }
354        }
355
356        // If precision increased, update all previous depths
357        if precision_updated {
358            for depth in depths.iter_mut() {
359                for i in 0..DEPTH10_LEN {
360                    if price_precision.is_none() {
361                        depth.bids[i].price.precision = current_price_precision;
362                        depth.asks[i].price.precision = current_price_precision;
363                    }
364                    if size_precision.is_none() {
365                        depth.bids[i].size.precision = current_size_precision;
366                        depth.asks[i].size.precision = current_size_precision;
367                    }
368                }
369            }
370        }
371
372        let instrument_id = match &instrument_id {
373            Some(id) => *id,
374            None => parse_instrument_id(&data.exchange, data.symbol),
375        };
376        let flags = RecordFlag::F_LAST.value();
377        let sequence = 0; // Sequence not available
378        let ts_event = parse_timestamp(data.timestamp);
379        let ts_init = parse_timestamp(data.local_timestamp);
380
381        // Initialize empty arrays
382        let mut bids = [NULL_ORDER; DEPTH10_LEN];
383        let mut asks = [NULL_ORDER; DEPTH10_LEN];
384        let mut bid_counts = [0u32; DEPTH10_LEN];
385        let mut ask_counts = [0u32; DEPTH10_LEN];
386
387        for i in 0..=4 {
388            // Create bids
389            let (bid_order, bid_count) = create_book_order(
390                OrderSide::Buy,
391                match i {
392                    0 => data.bids_0_price,
393                    1 => data.bids_1_price,
394                    2 => data.bids_2_price,
395                    3 => data.bids_3_price,
396                    4 => data.bids_4_price,
397                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
398                },
399                match i {
400                    0 => data.bids_0_amount,
401                    1 => data.bids_1_amount,
402                    2 => data.bids_2_amount,
403                    3 => data.bids_3_amount,
404                    4 => data.bids_4_amount,
405                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
406                },
407                current_price_precision,
408                current_size_precision,
409            );
410            bids[i] = bid_order;
411            bid_counts[i] = bid_count;
412
413            // Create asks
414            let (ask_order, ask_count) = create_book_order(
415                OrderSide::Sell,
416                match i {
417                    0 => data.asks_0_price,
418                    1 => data.asks_1_price,
419                    2 => data.asks_2_price,
420                    3 => data.asks_3_price,
421                    4 => data.asks_4_price,
422                    _ => None, // Unreachable, but for safety
423                },
424                match i {
425                    0 => data.asks_0_amount,
426                    1 => data.asks_1_amount,
427                    2 => data.asks_2_amount,
428                    3 => data.asks_3_amount,
429                    4 => data.asks_4_amount,
430                    _ => None, // Unreachable, but for safety
431                },
432                current_price_precision,
433                current_size_precision,
434            );
435            asks[i] = ask_order;
436            ask_counts[i] = ask_count;
437        }
438
439        let depth = OrderBookDepth10::new(
440            instrument_id,
441            bids,
442            asks,
443            bid_counts,
444            ask_counts,
445            flags,
446            sequence,
447            ts_event,
448            ts_init,
449        );
450
451        depths.push(depth);
452
453        if let Some(limit) = limit
454            && depths.len() >= limit
455        {
456            break;
457        }
458    }
459
460    Ok(depths)
461}
462
463/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
464/// automatically applying `GZip` decompression for files ending in ".gz".
465/// Load order book depth-10 snapshots (25-level) from a CSV or gzipped CSV file.
466///
467/// # Errors
468///
469/// Returns an error if the file cannot be opened, read, or parsed as CSV.
470/// # Panics
471///
472/// Panics if a record level cannot be parsed to depth-10.
473pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
474    filepath: P,
475    price_precision: Option<u8>,
476    size_precision: Option<u8>,
477    instrument_id: Option<InstrumentId>,
478    limit: Option<usize>,
479) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
480    // Estimate capacity for Vec pre-allocation
481    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
482    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
483
484    let mut current_price_precision = price_precision.unwrap_or(0);
485    let mut current_size_precision = size_precision.unwrap_or(0);
486    let mut reader = create_csv_reader(filepath)?;
487    let mut record = StringRecord::new();
488
489    while reader.read_record(&mut record)? {
490        let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
491
492        // Update precisions dynamically if not explicitly set
493        let mut precision_updated = false;
494
495        if price_precision.is_none() {
496            if let Some(bid_price) = data.bids_0_price {
497                let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
498                if inferred_price_precision > current_price_precision {
499                    current_price_precision = inferred_price_precision;
500                    precision_updated = true;
501                }
502            }
503        }
504
505        if size_precision.is_none() {
506            if let Some(bid_amount) = data.bids_0_amount {
507                let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
508                if inferred_size_precision > current_size_precision {
509                    current_size_precision = inferred_size_precision;
510                    precision_updated = true;
511                }
512            }
513        }
514
515        // If precision increased, update all previous depths
516        if precision_updated {
517            for depth in depths.iter_mut() {
518                for i in 0..DEPTH10_LEN {
519                    if price_precision.is_none() {
520                        depth.bids[i].price.precision = current_price_precision;
521                        depth.asks[i].price.precision = current_price_precision;
522                    }
523                    if size_precision.is_none() {
524                        depth.bids[i].size.precision = current_size_precision;
525                        depth.asks[i].size.precision = current_size_precision;
526                    }
527                }
528            }
529        }
530
531        let instrument_id = match &instrument_id {
532            Some(id) => *id,
533            None => parse_instrument_id(&data.exchange, data.symbol),
534        };
535        let flags = RecordFlag::F_LAST.value();
536        let sequence = 0; // Sequence not available
537        let ts_event = parse_timestamp(data.timestamp);
538        let ts_init = parse_timestamp(data.local_timestamp);
539
540        // Initialize empty arrays for the first 10 levels only
541        let mut bids = [NULL_ORDER; DEPTH10_LEN];
542        let mut asks = [NULL_ORDER; DEPTH10_LEN];
543        let mut bid_counts = [0u32; DEPTH10_LEN];
544        let mut ask_counts = [0u32; DEPTH10_LEN];
545
546        // Fill only the first 10 levels from the 25-level record
547        for i in 0..DEPTH10_LEN {
548            // Create bids
549            let (bid_order, bid_count) = create_book_order(
550                OrderSide::Buy,
551                match i {
552                    0 => data.bids_0_price,
553                    1 => data.bids_1_price,
554                    2 => data.bids_2_price,
555                    3 => data.bids_3_price,
556                    4 => data.bids_4_price,
557                    5 => data.bids_5_price,
558                    6 => data.bids_6_price,
559                    7 => data.bids_7_price,
560                    8 => data.bids_8_price,
561                    9 => data.bids_9_price,
562                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
563                },
564                match i {
565                    0 => data.bids_0_amount,
566                    1 => data.bids_1_amount,
567                    2 => data.bids_2_amount,
568                    3 => data.bids_3_amount,
569                    4 => data.bids_4_amount,
570                    5 => data.bids_5_amount,
571                    6 => data.bids_6_amount,
572                    7 => data.bids_7_amount,
573                    8 => data.bids_8_amount,
574                    9 => data.bids_9_amount,
575                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
576                },
577                current_price_precision,
578                current_size_precision,
579            );
580            bids[i] = bid_order;
581            bid_counts[i] = bid_count;
582
583            // Create asks
584            let (ask_order, ask_count) = create_book_order(
585                OrderSide::Sell,
586                match i {
587                    0 => data.asks_0_price,
588                    1 => data.asks_1_price,
589                    2 => data.asks_2_price,
590                    3 => data.asks_3_price,
591                    4 => data.asks_4_price,
592                    5 => data.asks_5_price,
593                    6 => data.asks_6_price,
594                    7 => data.asks_7_price,
595                    8 => data.asks_8_price,
596                    9 => data.asks_9_price,
597                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
598                },
599                match i {
600                    0 => data.asks_0_amount,
601                    1 => data.asks_1_amount,
602                    2 => data.asks_2_amount,
603                    3 => data.asks_3_amount,
604                    4 => data.asks_4_amount,
605                    5 => data.asks_5_amount,
606                    6 => data.asks_6_amount,
607                    7 => data.asks_7_amount,
608                    8 => data.asks_8_amount,
609                    9 => data.asks_9_amount,
610                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
611                },
612                current_price_precision,
613                current_size_precision,
614            );
615            asks[i] = ask_order;
616            ask_counts[i] = ask_count;
617        }
618
619        let depth = OrderBookDepth10::new(
620            instrument_id,
621            bids,
622            asks,
623            bid_counts,
624            ask_counts,
625            flags,
626            sequence,
627            ts_event,
628            ts_init,
629        );
630
631        depths.push(depth);
632
633        if let Some(limit) = limit
634            && depths.len() >= limit
635        {
636            break;
637        }
638    }
639
640    Ok(depths)
641}
642
643/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
644/// automatically applying `GZip` decompression for files ending in ".gz".
645/// Load quote ticks from a CSV or gzipped CSV file.
646///
647/// # Errors
648///
649/// Returns an error if the file cannot be opened, read, or parsed as CSV.
650/// # Panics
651///
652/// Panics if a record has invalid data or CSV parsing errors.
653pub fn load_quote_ticks<P: AsRef<Path>>(
654    filepath: P,
655    price_precision: Option<u8>,
656    size_precision: Option<u8>,
657    instrument_id: Option<InstrumentId>,
658    limit: Option<usize>,
659) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
660    // Estimate capacity for Vec pre-allocation
661    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
662    let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
663
664    let mut current_price_precision = price_precision.unwrap_or(0);
665    let mut current_size_precision = size_precision.unwrap_or(0);
666    let mut reader = create_csv_reader(filepath)?;
667    let mut record = StringRecord::new();
668
669    while reader.read_record(&mut record)? {
670        let data: TardisQuoteRecord = record.deserialize(None)?;
671
672        // Update precisions dynamically if not explicitly set
673        let mut precision_updated = false;
674
675        if price_precision.is_none() {
676            if let Some(bid_price) = data.bid_price {
677                let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
678                if inferred_price_precision > current_price_precision {
679                    current_price_precision = inferred_price_precision;
680                    precision_updated = true;
681                }
682            }
683        }
684
685        if size_precision.is_none() {
686            if let Some(bid_amount) = data.bid_amount {
687                let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
688                if inferred_size_precision > current_size_precision {
689                    current_size_precision = inferred_size_precision;
690                    precision_updated = true;
691                }
692            }
693        }
694
695        // If precision increased, update all previous quotes
696        if precision_updated {
697            for quote in quotes.iter_mut() {
698                if price_precision.is_none() {
699                    quote.bid_price.precision = current_price_precision;
700                    quote.ask_price.precision = current_price_precision;
701                }
702                if size_precision.is_none() {
703                    quote.bid_size.precision = current_size_precision;
704                    quote.ask_size.precision = current_size_precision;
705                }
706            }
707        }
708
709        let instrument_id = match &instrument_id {
710            Some(id) => *id,
711            None => parse_instrument_id(&data.exchange, data.symbol),
712        };
713        let bid_price = parse_price(data.bid_price.unwrap_or(0.0), current_price_precision);
714        let bid_size = Quantity::new(data.bid_amount.unwrap_or(0.0), current_size_precision);
715        let ask_price = parse_price(data.ask_price.unwrap_or(0.0), current_price_precision);
716        let ask_size = Quantity::new(data.ask_amount.unwrap_or(0.0), current_size_precision);
717        let ts_event = parse_timestamp(data.timestamp);
718        let ts_init = parse_timestamp(data.local_timestamp);
719
720        let quote = QuoteTick::new(
721            instrument_id,
722            bid_price,
723            ask_price,
724            bid_size,
725            ask_size,
726            ts_event,
727            ts_init,
728        );
729
730        quotes.push(quote);
731
732        if let Some(limit) = limit
733            && quotes.len() >= limit
734        {
735            break;
736        }
737    }
738
739    Ok(quotes)
740}
741
742/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
743/// automatically applying `GZip` decompression for files ending in ".gz".
744/// Load trade ticks from a CSV or gzipped CSV file.
745///
746/// # Errors
747///
748/// Returns an error if the file cannot be opened, read, or parsed as CSV.
749/// # Panics
750///
751/// Panics if a record has invalid trade size or CSV parsing errors.
752pub fn load_trade_ticks<P: AsRef<Path>>(
753    filepath: P,
754    price_precision: Option<u8>,
755    size_precision: Option<u8>,
756    instrument_id: Option<InstrumentId>,
757    limit: Option<usize>,
758) -> Result<Vec<TradeTick>, Box<dyn Error>> {
759    // Estimate capacity for Vec pre-allocation
760    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
761    let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
762
763    let mut current_price_precision = price_precision.unwrap_or(0);
764    let mut current_size_precision = size_precision.unwrap_or(0);
765    let mut reader = create_csv_reader(filepath)?;
766    let mut record = StringRecord::new();
767
768    while reader.read_record(&mut record)? {
769        let data: TardisTradeRecord = record.deserialize(None)?;
770
771        // Update precisions dynamically if not explicitly set
772        let mut precision_updated = false;
773
774        if price_precision.is_none() {
775            let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
776            if inferred_price_precision > current_price_precision {
777                current_price_precision = inferred_price_precision;
778                precision_updated = true;
779            }
780        }
781
782        if size_precision.is_none() {
783            let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
784            if inferred_size_precision > current_size_precision {
785                current_size_precision = inferred_size_precision;
786                precision_updated = true;
787            }
788        }
789
790        // If precision increased, update all previous trades
791        if precision_updated {
792            for trade in trades.iter_mut() {
793                if price_precision.is_none() {
794                    trade.price.precision = current_price_precision;
795                }
796                if size_precision.is_none() {
797                    trade.size.precision = current_size_precision;
798                }
799            }
800        }
801
802        let instrument_id = match &instrument_id {
803            Some(id) => *id,
804            None => parse_instrument_id(&data.exchange, data.symbol),
805        };
806        let price = parse_price(data.price, current_price_precision);
807        let size = Quantity::non_zero_checked(data.amount, current_size_precision)
808            .unwrap_or_else(|e| panic!("Invalid {data:?}: size {e}"));
809        let aggressor_side = parse_aggressor_side(&data.side);
810        let trade_id = TradeId::new(&data.id);
811        let ts_event = parse_timestamp(data.timestamp);
812        let ts_init = parse_timestamp(data.local_timestamp);
813
814        let trade = TradeTick::new(
815            instrument_id,
816            price,
817            size,
818            aggressor_side,
819            trade_id,
820            ts_event,
821            ts_init,
822        );
823
824        trades.push(trade);
825
826        if let Some(limit) = limit
827            && trades.len() >= limit
828        {
829            break;
830        }
831    }
832
833    Ok(trades)
834}
835
836////////////////////////////////////////////////////////////////////////////////
837// Tests
838////////////////////////////////////////////////////////////////////////////////
839#[cfg(test)]
840mod tests {
841    use nautilus_model::{
842        enums::{AggressorSide, BookAction},
843        identifiers::InstrumentId,
844        types::Price,
845    };
846    use nautilus_testkit::common::{
847        ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
848        ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
849        ensure_data_exists_tardis_huobi_quotes,
850    };
851    use rstest::*;
852
853    use super::*;
854
855    #[rstest]
856    #[case(0.0, 0)]
857    #[case(42.0, 0)]
858    #[case(0.1, 1)]
859    #[case(0.25, 2)]
860    #[case(123.0001, 4)]
861    #[case(-42.987654321,       9)]
862    #[case(1.234_567_890_123, 12)]
863    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
864        assert_eq!(infer_precision(input), expected);
865    }
866
867    #[rstest]
868    pub fn test_dynamic_precision_inference() {
869        // Create synthetic CSV data with increasing precision
870        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
871binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
872binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
873binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
874binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
875binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
876
877        // Write to temporary file
878        let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
879        std::fs::write(&temp_file, csv_data).unwrap();
880
881        // Load with precision inference
882        let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
883
884        // Should have 5 deltas
885        assert_eq!(deltas.len(), 5);
886
887        // All deltas should have the maximum precision found (4 for price, 1 for size)
888        for (i, delta) in deltas.iter().enumerate() {
889            assert_eq!(
890                delta.order.price.precision, 4,
891                "Price precision should be 4 for delta {i}",
892            );
893            assert_eq!(
894                delta.order.size.precision, 1,
895                "Size precision should be 1 for delta {i}",
896            );
897        }
898
899        // Test exact values to catch precision issues
900        // First delta: 50000.0 with precision 4 should equal 50000.0000
901        assert_eq!(deltas[0].order.price, parse_price(50000.0, 4));
902        assert_eq!(deltas[0].order.size, Quantity::new(1.0, 1));
903
904        // Second delta: 49999.5 with precision 4 should equal 49999.5000
905        assert_eq!(deltas[1].order.price, parse_price(49999.5, 4));
906        assert_eq!(deltas[1].order.size, Quantity::new(2.0, 1));
907
908        // Third delta: 50000.12 with precision 4 should equal 50000.1200
909        assert_eq!(deltas[2].order.price, parse_price(50000.12, 4));
910        assert_eq!(deltas[2].order.size, Quantity::new(1.5, 1));
911
912        // Fourth delta: 49999.123 with precision 4 should equal 49999.1230
913        assert_eq!(deltas[3].order.price, parse_price(49999.123, 4));
914        assert_eq!(deltas[3].order.size, Quantity::new(3.0, 1));
915
916        // Fifth delta: 50000.1234 with precision 4 should equal 50000.1234
917        assert_eq!(deltas[4].order.price, parse_price(50000.1234, 4));
918        assert_eq!(deltas[4].order.size, Quantity::new(0.5, 1));
919
920        // Test that early records (with lower input precision) match expected values
921        // This verifies retroactive precision updates work correctly
922        assert_eq!(
923            deltas[0].order.price.precision,
924            deltas[4].order.price.precision
925        );
926        assert_eq!(
927            deltas[0].order.size.precision,
928            deltas[2].order.size.precision
929        );
930
931        // Clean up
932        std::fs::remove_file(&temp_file).ok();
933    }
934
935    // TODO: Flaky in CI, potentially from syncing large test data files from cache
936    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
937    #[rstest]
938    #[case(Some(1), Some(0))] // Explicit precisions
939    #[case(None, None)] // Inferred precisions
940    pub fn test_read_deltas(
941        #[case] price_precision: Option<u8>,
942        #[case] size_precision: Option<u8>,
943    ) {
944        let filepath = ensure_data_exists_tardis_deribit_book_l2();
945        let deltas = load_deltas(
946            filepath,
947            price_precision,
948            size_precision,
949            None,
950            Some(10_000),
951        )
952        .unwrap();
953
954        assert_eq!(deltas.len(), 10_000);
955        assert_eq!(
956            deltas[0].instrument_id,
957            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
958        );
959        assert_eq!(deltas[0].action, BookAction::Add);
960        assert_eq!(deltas[0].order.side, OrderSide::Sell);
961        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
962        assert_eq!(deltas[0].order.size, Quantity::from("18640"));
963        assert_eq!(deltas[0].flags, 0);
964        assert_eq!(deltas[0].sequence, 0);
965        assert_eq!(deltas[0].ts_event, 1585699200245000000);
966        assert_eq!(deltas[0].ts_init, 1585699200355684000);
967    }
968
969    // TODO: Flaky in CI, potentially from syncing large test data files from cache
970    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
971    #[rstest]
972    #[case(Some(2), Some(3))] // Explicit precisions
973    #[case(None, None)] // Inferred precisions
974    pub fn test_read_depth10s_from_snapshot5(
975        #[case] price_precision: Option<u8>,
976        #[case] size_precision: Option<u8>,
977    ) {
978        let filepath = ensure_data_exists_tardis_binance_snapshot5();
979        let depths = load_depth10_from_snapshot5(
980            filepath,
981            price_precision,
982            size_precision,
983            None,
984            Some(10_000),
985        )
986        .unwrap();
987
988        assert_eq!(depths.len(), 10_000);
989        assert_eq!(
990            depths[0].instrument_id,
991            InstrumentId::from("BTCUSDT.BINANCE")
992        );
993        assert_eq!(depths[0].bids.len(), 10);
994        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
995        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
996        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
997        assert_eq!(depths[0].bids[0].order_id, 0);
998        assert_eq!(depths[0].asks.len(), 10);
999        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
1000        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
1001        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
1002        assert_eq!(depths[0].asks[0].order_id, 0);
1003        assert_eq!(depths[0].bid_counts[0], 1);
1004        assert_eq!(depths[0].ask_counts[0], 1);
1005        assert_eq!(depths[0].flags, 128);
1006        assert_eq!(depths[0].ts_event, 1598918403696000000);
1007        assert_eq!(depths[0].ts_init, 1598918403810979000);
1008        assert_eq!(depths[0].sequence, 0);
1009    }
1010
1011    // TODO: Flaky in CI, potentially from syncing large test data files from cache
1012    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
1013    #[rstest]
1014    #[case(Some(2), Some(3))] // Explicit precisions
1015    #[case(None, None)] // Inferred precisions
1016    pub fn test_read_depth10s_from_snapshot25(
1017        #[case] price_precision: Option<u8>,
1018        #[case] size_precision: Option<u8>,
1019    ) {
1020        let filepath = ensure_data_exists_tardis_binance_snapshot25();
1021        let depths = load_depth10_from_snapshot25(
1022            filepath,
1023            price_precision,
1024            size_precision,
1025            None,
1026            Some(10_000),
1027        )
1028        .unwrap();
1029
1030        assert_eq!(depths.len(), 10_000);
1031        assert_eq!(
1032            depths[0].instrument_id,
1033            InstrumentId::from("BTCUSDT.BINANCE")
1034        );
1035        assert_eq!(depths[0].bids.len(), 10);
1036        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
1037        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
1038        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
1039        assert_eq!(depths[0].bids[0].order_id, 0);
1040        assert_eq!(depths[0].asks.len(), 10);
1041        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
1042        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
1043        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
1044        assert_eq!(depths[0].asks[0].order_id, 0);
1045        assert_eq!(depths[0].bid_counts[0], 1);
1046        assert_eq!(depths[0].ask_counts[0], 1);
1047        assert_eq!(depths[0].flags, 128);
1048        assert_eq!(depths[0].ts_event, 1598918403696000000);
1049        assert_eq!(depths[0].ts_init, 1598918403810979000);
1050        assert_eq!(depths[0].sequence, 0);
1051    }
1052
1053    // TODO: Flaky in CI, potentially from syncing large test data files from cache
1054    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
1055    #[rstest]
1056    #[case(Some(1), Some(0))] // Explicit precisions
1057    #[case(None, None)] // Inferred precisions
1058    pub fn test_read_quotes(
1059        #[case] price_precision: Option<u8>,
1060        #[case] size_precision: Option<u8>,
1061    ) {
1062        let filepath = ensure_data_exists_tardis_huobi_quotes();
1063        let quotes = load_quote_ticks(
1064            filepath,
1065            price_precision,
1066            size_precision,
1067            None,
1068            Some(10_000),
1069        )
1070        .unwrap();
1071
1072        assert_eq!(quotes.len(), 10_000);
1073        assert_eq!(
1074            quotes[0].instrument_id,
1075            InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
1076        );
1077        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
1078        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
1079        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
1080        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
1081        assert_eq!(quotes[0].ts_event, 1588291201099000000);
1082        assert_eq!(quotes[0].ts_init, 1588291201234268000);
1083    }
1084
1085    // TODO: Flaky in CI, potentially from syncing large test data files from cache
1086    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
1087    #[rstest]
1088    #[case(Some(1), Some(0))] // Explicit precisions
1089    #[case(None, None)] // Inferred precisions
1090    pub fn test_read_trades(
1091        #[case] price_precision: Option<u8>,
1092        #[case] size_precision: Option<u8>,
1093    ) {
1094        let filepath = ensure_data_exists_tardis_bitmex_trades();
1095        let trades = load_trade_ticks(
1096            filepath,
1097            price_precision,
1098            size_precision,
1099            None,
1100            Some(10_000),
1101        )
1102        .unwrap();
1103
1104        assert_eq!(trades.len(), 10_000);
1105        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1106        assert_eq!(trades[0].price, Price::from("8531.5"));
1107        assert_eq!(trades[0].size, Quantity::from("2152"));
1108        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
1109        assert_eq!(
1110            trades[0].trade_id,
1111            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1112        );
1113        assert_eq!(trades[0].ts_event, 1583020803145000000);
1114        assert_eq!(trades[0].ts_init, 1583020803307160000);
1115    }
1116}