nautilus_tardis/csv/
load.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{
22        DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick,
24    },
25    enums::{OrderSide, RecordFlag},
26    identifiers::InstrumentId,
27    types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31    csv::{
32        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
33        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
34        record::{
35            TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
36            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
37        },
38    },
39    parse::{parse_instrument_id, parse_timestamp},
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43    if explicit.is_some() {
44        return false;
45    }
46
47    let inferred = infer_precision(value).min(FIXED_PRECISION);
48    if inferred > *current {
49        *current = inferred;
50        true
51    } else {
52        false
53    }
54}
55
56fn update_deltas_precision(
57    deltas: &mut [OrderBookDelta],
58    price_precision: Option<u8>,
59    size_precision: Option<u8>,
60    current_price_precision: u8,
61    current_size_precision: u8,
62) {
63    for delta in deltas {
64        if price_precision.is_none() {
65            delta.order.price.precision = current_price_precision;
66        }
67        if size_precision.is_none() {
68            delta.order.size.precision = current_size_precision;
69        }
70    }
71}
72
73fn update_quotes_precision(
74    quotes: &mut [QuoteTick],
75    price_precision: Option<u8>,
76    size_precision: Option<u8>,
77    current_price_precision: u8,
78    current_size_precision: u8,
79) {
80    for quote in quotes {
81        if price_precision.is_none() {
82            quote.bid_price.precision = current_price_precision;
83            quote.ask_price.precision = current_price_precision;
84        }
85        if size_precision.is_none() {
86            quote.bid_size.precision = current_size_precision;
87            quote.ask_size.precision = current_size_precision;
88        }
89    }
90}
91
92fn update_trades_precision(
93    trades: &mut [TradeTick],
94    price_precision: Option<u8>,
95    size_precision: Option<u8>,
96    current_price_precision: u8,
97    current_size_precision: u8,
98) {
99    for trade in trades {
100        if price_precision.is_none() {
101            trade.price.precision = current_price_precision;
102        }
103        if size_precision.is_none() {
104            trade.size.precision = current_size_precision;
105        }
106    }
107}
108
109/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
110/// automatically applying `GZip` decompression for files ending in ".gz".
111/// Load order book delta records from a CSV or gzipped CSV file.
112///
113/// # Errors
114///
115/// Returns an error if the file cannot be opened, read, or parsed as CSV.
116///
117/// # Panics
118///
119/// Panics if a CSV record has a zero size for a non-delete action or if data conversion fails.
120pub fn load_deltas<P: AsRef<Path>>(
121    filepath: P,
122    price_precision: Option<u8>,
123    size_precision: Option<u8>,
124    instrument_id: Option<InstrumentId>,
125    limit: Option<usize>,
126) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
127    // Estimate capacity for Vec pre-allocation
128    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
129    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
130
131    let mut current_price_precision = price_precision.unwrap_or(0);
132    let mut current_size_precision = size_precision.unwrap_or(0);
133    let mut last_ts_event = UnixNanos::default();
134
135    let mut reader = create_csv_reader(filepath)?;
136    let mut record = StringRecord::new();
137
138    while reader.read_record(&mut record)? {
139        let data: TardisBookUpdateRecord = record.deserialize(None)?;
140
141        // Update precisions dynamically if not explicitly set
142        let price_updated =
143            update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
144        let size_updated =
145            update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
146
147        // If precision increased, update all previous deltas
148        if price_updated || size_updated {
149            update_deltas_precision(
150                &mut deltas,
151                price_precision,
152                size_precision,
153                current_price_precision,
154                current_size_precision,
155            );
156        }
157
158        let delta = parse_delta_record(
159            &data,
160            current_price_precision,
161            current_size_precision,
162            instrument_id,
163        );
164
165        // Check if timestamp is different from last timestamp
166        let ts_event = delta.ts_event;
167        if last_ts_event != ts_event
168            && let Some(last_delta) = deltas.last_mut()
169        {
170            // Set previous delta flags as F_LAST
171            last_delta.flags = RecordFlag::F_LAST.value();
172        }
173
174        last_ts_event = ts_event;
175
176        deltas.push(delta);
177
178        if let Some(limit) = limit
179            && deltas.len() >= limit
180        {
181            break;
182        }
183    }
184
185    // Set F_LAST flag for final delta
186    if let Some(last_delta) = deltas.last_mut() {
187        last_delta.flags = RecordFlag::F_LAST.value();
188    }
189
190    Ok(deltas)
191}
192
193/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
194/// automatically applying `GZip` decompression for files ending in ".gz".
195/// Load order book depth-10 snapshots (5-level) from a CSV or gzipped CSV file.
196///
197/// # Errors
198///
199/// Returns an error if the file cannot be opened, read, or parsed as CSV.
200///
201/// # Panics
202///
203/// Panics if a record level cannot be parsed to depth-10.
204pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
205    filepath: P,
206    price_precision: Option<u8>,
207    size_precision: Option<u8>,
208    instrument_id: Option<InstrumentId>,
209    limit: Option<usize>,
210) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
211    // Estimate capacity for Vec pre-allocation
212    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
213    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
214
215    let mut current_price_precision = price_precision.unwrap_or(0);
216    let mut current_size_precision = size_precision.unwrap_or(0);
217
218    let mut reader = create_csv_reader(filepath)?;
219    let mut record = StringRecord::new();
220
221    while reader.read_record(&mut record)? {
222        let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
223
224        // Update precisions dynamically if not explicitly set
225        let mut precision_updated = false;
226
227        if price_precision.is_none()
228            && let Some(bid_price) = data.bids_0_price
229        {
230            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
231            if inferred_price_precision > current_price_precision {
232                current_price_precision = inferred_price_precision;
233                precision_updated = true;
234            }
235        }
236
237        if size_precision.is_none()
238            && let Some(bid_amount) = data.bids_0_amount
239        {
240            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
241            if inferred_size_precision > current_size_precision {
242                current_size_precision = inferred_size_precision;
243                precision_updated = true;
244            }
245        }
246
247        // If precision increased, update all previous depths
248        if precision_updated {
249            for depth in &mut depths {
250                for i in 0..DEPTH10_LEN {
251                    if price_precision.is_none() {
252                        depth.bids[i].price.precision = current_price_precision;
253                        depth.asks[i].price.precision = current_price_precision;
254                    }
255                    if size_precision.is_none() {
256                        depth.bids[i].size.precision = current_size_precision;
257                        depth.asks[i].size.precision = current_size_precision;
258                    }
259                }
260            }
261        }
262
263        let instrument_id = match &instrument_id {
264            Some(id) => *id,
265            None => parse_instrument_id(&data.exchange, data.symbol),
266        };
267        let flags = RecordFlag::F_LAST.value();
268        let sequence = 0; // Sequence not available
269        let ts_event = parse_timestamp(data.timestamp);
270        let ts_init = parse_timestamp(data.local_timestamp);
271
272        // Initialize empty arrays
273        let mut bids = [NULL_ORDER; DEPTH10_LEN];
274        let mut asks = [NULL_ORDER; DEPTH10_LEN];
275        let mut bid_counts = [0u32; DEPTH10_LEN];
276        let mut ask_counts = [0u32; DEPTH10_LEN];
277
278        for i in 0..=4 {
279            // Create bids
280            let (bid_order, bid_count) = create_book_order(
281                OrderSide::Buy,
282                match i {
283                    0 => data.bids_0_price,
284                    1 => data.bids_1_price,
285                    2 => data.bids_2_price,
286                    3 => data.bids_3_price,
287                    4 => data.bids_4_price,
288                    _ => unreachable!("i is constrained to 0..=4 by loop"),
289                },
290                match i {
291                    0 => data.bids_0_amount,
292                    1 => data.bids_1_amount,
293                    2 => data.bids_2_amount,
294                    3 => data.bids_3_amount,
295                    4 => data.bids_4_amount,
296                    _ => unreachable!("i is constrained to 0..=4 by loop"),
297                },
298                current_price_precision,
299                current_size_precision,
300            );
301            bids[i] = bid_order;
302            bid_counts[i] = bid_count;
303
304            // Create asks
305            let (ask_order, ask_count) = create_book_order(
306                OrderSide::Sell,
307                match i {
308                    0 => data.asks_0_price,
309                    1 => data.asks_1_price,
310                    2 => data.asks_2_price,
311                    3 => data.asks_3_price,
312                    4 => data.asks_4_price,
313                    _ => None, // Unreachable, but for safety
314                },
315                match i {
316                    0 => data.asks_0_amount,
317                    1 => data.asks_1_amount,
318                    2 => data.asks_2_amount,
319                    3 => data.asks_3_amount,
320                    4 => data.asks_4_amount,
321                    _ => None, // Unreachable, but for safety
322                },
323                current_price_precision,
324                current_size_precision,
325            );
326            asks[i] = ask_order;
327            ask_counts[i] = ask_count;
328        }
329
330        let depth = OrderBookDepth10::new(
331            instrument_id,
332            bids,
333            asks,
334            bid_counts,
335            ask_counts,
336            flags,
337            sequence,
338            ts_event,
339            ts_init,
340        );
341
342        depths.push(depth);
343
344        if let Some(limit) = limit
345            && depths.len() >= limit
346        {
347            break;
348        }
349    }
350
351    Ok(depths)
352}
353
354/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
355/// automatically applying `GZip` decompression for files ending in ".gz".
356/// Load order book depth-10 snapshots (25-level) from a CSV or gzipped CSV file.
357///
358/// # Errors
359///
360/// Returns an error if the file cannot be opened, read, or parsed as CSV.
361pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
362    filepath: P,
363    price_precision: Option<u8>,
364    size_precision: Option<u8>,
365    instrument_id: Option<InstrumentId>,
366    limit: Option<usize>,
367) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
368    // Estimate capacity for Vec pre-allocation
369    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
370    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
371
372    let mut current_price_precision = price_precision.unwrap_or(0);
373    let mut current_size_precision = size_precision.unwrap_or(0);
374    let mut reader = create_csv_reader(filepath)?;
375    let mut record = StringRecord::new();
376
377    while reader.read_record(&mut record)? {
378        let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
379
380        // Update precisions dynamically if not explicitly set
381        let mut precision_updated = false;
382
383        if price_precision.is_none()
384            && let Some(bid_price) = data.bids_0_price
385        {
386            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
387            if inferred_price_precision > current_price_precision {
388                current_price_precision = inferred_price_precision;
389                precision_updated = true;
390            }
391        }
392
393        if size_precision.is_none()
394            && let Some(bid_amount) = data.bids_0_amount
395        {
396            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
397            if inferred_size_precision > current_size_precision {
398                current_size_precision = inferred_size_precision;
399                precision_updated = true;
400            }
401        }
402
403        // If precision increased, update all previous depths
404        if precision_updated {
405            for depth in &mut depths {
406                for i in 0..DEPTH10_LEN {
407                    if price_precision.is_none() {
408                        depth.bids[i].price.precision = current_price_precision;
409                        depth.asks[i].price.precision = current_price_precision;
410                    }
411                    if size_precision.is_none() {
412                        depth.bids[i].size.precision = current_size_precision;
413                        depth.asks[i].size.precision = current_size_precision;
414                    }
415                }
416            }
417        }
418
419        let instrument_id = match &instrument_id {
420            Some(id) => *id,
421            None => parse_instrument_id(&data.exchange, data.symbol),
422        };
423        let flags = RecordFlag::F_LAST.value();
424        let sequence = 0; // Sequence not available
425        let ts_event = parse_timestamp(data.timestamp);
426        let ts_init = parse_timestamp(data.local_timestamp);
427
428        // Initialize empty arrays for the first 10 levels only
429        let mut bids = [NULL_ORDER; DEPTH10_LEN];
430        let mut asks = [NULL_ORDER; DEPTH10_LEN];
431        let mut bid_counts = [0u32; DEPTH10_LEN];
432        let mut ask_counts = [0u32; DEPTH10_LEN];
433
434        // Fill only the first 10 levels from the 25-level record
435        for i in 0..DEPTH10_LEN {
436            // Create bids
437            let (bid_order, bid_count) = create_book_order(
438                OrderSide::Buy,
439                match i {
440                    0 => data.bids_0_price,
441                    1 => data.bids_1_price,
442                    2 => data.bids_2_price,
443                    3 => data.bids_3_price,
444                    4 => data.bids_4_price,
445                    5 => data.bids_5_price,
446                    6 => data.bids_6_price,
447                    7 => data.bids_7_price,
448                    8 => data.bids_8_price,
449                    9 => data.bids_9_price,
450                    _ => unreachable!("i is constrained to 0..10 by loop"),
451                },
452                match i {
453                    0 => data.bids_0_amount,
454                    1 => data.bids_1_amount,
455                    2 => data.bids_2_amount,
456                    3 => data.bids_3_amount,
457                    4 => data.bids_4_amount,
458                    5 => data.bids_5_amount,
459                    6 => data.bids_6_amount,
460                    7 => data.bids_7_amount,
461                    8 => data.bids_8_amount,
462                    9 => data.bids_9_amount,
463                    _ => unreachable!("i is constrained to 0..10 by loop"),
464                },
465                current_price_precision,
466                current_size_precision,
467            );
468            bids[i] = bid_order;
469            bid_counts[i] = bid_count;
470
471            // Create asks
472            let (ask_order, ask_count) = create_book_order(
473                OrderSide::Sell,
474                match i {
475                    0 => data.asks_0_price,
476                    1 => data.asks_1_price,
477                    2 => data.asks_2_price,
478                    3 => data.asks_3_price,
479                    4 => data.asks_4_price,
480                    5 => data.asks_5_price,
481                    6 => data.asks_6_price,
482                    7 => data.asks_7_price,
483                    8 => data.asks_8_price,
484                    9 => data.asks_9_price,
485                    _ => unreachable!("i is constrained to 0..10 by loop"),
486                },
487                match i {
488                    0 => data.asks_0_amount,
489                    1 => data.asks_1_amount,
490                    2 => data.asks_2_amount,
491                    3 => data.asks_3_amount,
492                    4 => data.asks_4_amount,
493                    5 => data.asks_5_amount,
494                    6 => data.asks_6_amount,
495                    7 => data.asks_7_amount,
496                    8 => data.asks_8_amount,
497                    9 => data.asks_9_amount,
498                    _ => unreachable!("i is constrained to 0..10 by loop"),
499                },
500                current_price_precision,
501                current_size_precision,
502            );
503            asks[i] = ask_order;
504            ask_counts[i] = ask_count;
505        }
506
507        let depth = OrderBookDepth10::new(
508            instrument_id,
509            bids,
510            asks,
511            bid_counts,
512            ask_counts,
513            flags,
514            sequence,
515            ts_event,
516            ts_init,
517        );
518
519        depths.push(depth);
520
521        if let Some(limit) = limit
522            && depths.len() >= limit
523        {
524            break;
525        }
526    }
527
528    Ok(depths)
529}
530
531/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
532/// automatically applying `GZip` decompression for files ending in ".gz".
533/// Load quote ticks from a CSV or gzipped CSV file.
534///
535/// # Errors
536///
537/// Returns an error if the file cannot be opened, read, or parsed as CSV.
538///
539/// # Panics
540///
541/// Panics if a record has invalid data or CSV parsing errors.
542pub fn load_quotes<P: AsRef<Path>>(
543    filepath: P,
544    price_precision: Option<u8>,
545    size_precision: Option<u8>,
546    instrument_id: Option<InstrumentId>,
547    limit: Option<usize>,
548) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
549    // Estimate capacity for Vec pre-allocation
550    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
551    let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
552
553    let mut current_price_precision = price_precision.unwrap_or(0);
554    let mut current_size_precision = size_precision.unwrap_or(0);
555    let mut reader = create_csv_reader(filepath)?;
556    let mut record = StringRecord::new();
557
558    while reader.read_record(&mut record)? {
559        let data: TardisQuoteRecord = record.deserialize(None)?;
560
561        // Update precisions dynamically if not explicitly set
562        let mut precision_updated = false;
563
564        if price_precision.is_none()
565            && let Some(bid_price) = data.bid_price
566        {
567            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
568            if inferred_price_precision > current_price_precision {
569                current_price_precision = inferred_price_precision;
570                precision_updated = true;
571            }
572        }
573
574        if size_precision.is_none()
575            && let Some(bid_amount) = data.bid_amount
576        {
577            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
578            if inferred_size_precision > current_size_precision {
579                current_size_precision = inferred_size_precision;
580                precision_updated = true;
581            }
582        }
583
584        // If precision increased, update all previous quotes
585        if precision_updated {
586            update_quotes_precision(
587                &mut quotes,
588                price_precision,
589                size_precision,
590                current_price_precision,
591                current_size_precision,
592            );
593        }
594
595        let quote = parse_quote_record(
596            &data,
597            current_price_precision,
598            current_size_precision,
599            instrument_id,
600        );
601
602        quotes.push(quote);
603
604        if let Some(limit) = limit
605            && quotes.len() >= limit
606        {
607            break;
608        }
609    }
610
611    Ok(quotes)
612}
613
614/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
615/// automatically applying `GZip` decompression for files ending in ".gz".
616/// Load trade ticks from a CSV or gzipped CSV file.
617///
618/// # Errors
619///
620/// Returns an error if the file cannot be opened, read, or parsed as CSV.
621///
622/// # Panics
623///
624/// Panics if a record has invalid trade size or CSV parsing errors.
625pub fn load_trades<P: AsRef<Path>>(
626    filepath: P,
627    price_precision: Option<u8>,
628    size_precision: Option<u8>,
629    instrument_id: Option<InstrumentId>,
630    limit: Option<usize>,
631) -> Result<Vec<TradeTick>, Box<dyn Error>> {
632    // Estimate capacity for Vec pre-allocation
633    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
634    let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
635
636    let mut current_price_precision = price_precision.unwrap_or(0);
637    let mut current_size_precision = size_precision.unwrap_or(0);
638    let mut reader = create_csv_reader(filepath)?;
639    let mut record = StringRecord::new();
640
641    while reader.read_record(&mut record)? {
642        let data: TardisTradeRecord = record.deserialize(None)?;
643
644        // Update precisions dynamically if not explicitly set
645        let mut precision_updated = false;
646
647        if price_precision.is_none() {
648            let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
649            if inferred_price_precision > current_price_precision {
650                current_price_precision = inferred_price_precision;
651                precision_updated = true;
652            }
653        }
654
655        if size_precision.is_none() {
656            let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
657            if inferred_size_precision > current_size_precision {
658                current_size_precision = inferred_size_precision;
659                precision_updated = true;
660            }
661        }
662
663        // If precision increased, update all previous trades
664        if precision_updated {
665            update_trades_precision(
666                &mut trades,
667                price_precision,
668                size_precision,
669                current_price_precision,
670                current_size_precision,
671            );
672        }
673
674        let size = Quantity::new_checked(data.amount, current_size_precision)?;
675
676        if size.is_positive() {
677            let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
678
679            trades.push(trade);
680
681            if let Some(limit) = limit
682                && trades.len() >= limit
683            {
684                break;
685            }
686        } else {
687            log::warn!("Skipping zero-sized trade: {data:?}");
688        }
689    }
690
691    Ok(trades)
692}
693
694/// Loads [`FundingRateUpdate`]s from a Tardis format derivative ticker CSV at the given `filepath`,
695/// automatically applying `GZip` decompression for files ending in ".gz".
696///
697/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
698/// fields from derivative ticker data to create funding rate updates.
699///
700/// # Errors
701///
702/// Returns an error if the file cannot be opened, read, or parsed as CSV.
703pub fn load_funding_rates<P: AsRef<Path>>(
704    filepath: P,
705    instrument_id: Option<InstrumentId>,
706    limit: Option<usize>,
707) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
708    // Estimate capacity for Vec pre-allocation
709    let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
710    let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
711
712    let mut reader = create_csv_reader(filepath)?;
713    let mut record = StringRecord::new();
714
715    while reader.read_record(&mut record)? {
716        let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
717
718        // Parse to funding rate update (returns None if no funding data)
719        if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
720            funding_rates.push(funding_rate);
721
722            if let Some(limit) = limit
723                && funding_rates.len() >= limit
724            {
725                break;
726            }
727        }
728    }
729
730    Ok(funding_rates)
731}
732
733////////////////////////////////////////////////////////////////////////////////
734// Tests
735////////////////////////////////////////////////////////////////////////////////
736#[cfg(test)]
737mod tests {
738    use nautilus_model::{
739        enums::{AggressorSide, BookAction},
740        identifiers::TradeId,
741        types::Price,
742    };
743    use nautilus_testkit::common::{
744        get_tardis_binance_snapshot5_path, get_tardis_binance_snapshot25_path,
745        get_tardis_bitmex_trades_path, get_tardis_deribit_book_l2_path,
746        get_tardis_huobi_quotes_path,
747    };
748    use rstest::*;
749
750    use super::*;
751    use crate::{parse::parse_price, tests::get_test_data_path};
752
753    #[rstest]
754    #[case(0.0, 0)]
755    #[case(42.0, 0)]
756    #[case(0.1, 1)]
757    #[case(0.25, 2)]
758    #[case(123.0001, 4)]
759    #[case(-42.987654321,       9)]
760    #[case(1.234_567_890_123, 12)]
761    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
762        assert_eq!(infer_precision(input), expected);
763    }
764
765    #[rstest]
766    pub fn test_dynamic_precision_inference() {
767        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
768binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
769binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
770binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
771binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
772binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
773
774        let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
775        std::fs::write(&temp_file, csv_data).unwrap();
776
777        let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
778
779        assert_eq!(deltas.len(), 5);
780
781        for (i, delta) in deltas.iter().enumerate() {
782            assert_eq!(
783                delta.order.price.precision, 4,
784                "Price precision should be 4 for delta {i}",
785            );
786            assert_eq!(
787                delta.order.size.precision, 1,
788                "Size precision should be 1 for delta {i}",
789            );
790        }
791
792        // Test exact values to ensure retroactive precision updates work correctly
793        assert_eq!(deltas[0].order.price, parse_price(50000.0, 4));
794        assert_eq!(deltas[0].order.size, Quantity::new(1.0, 1));
795
796        assert_eq!(deltas[1].order.price, parse_price(49999.5, 4));
797        assert_eq!(deltas[1].order.size, Quantity::new(2.0, 1));
798
799        assert_eq!(deltas[2].order.price, parse_price(50000.12, 4));
800        assert_eq!(deltas[2].order.size, Quantity::new(1.5, 1));
801
802        assert_eq!(deltas[3].order.price, parse_price(49999.123, 4));
803        assert_eq!(deltas[3].order.size, Quantity::new(3.0, 1));
804
805        assert_eq!(deltas[4].order.price, parse_price(50000.1234, 4));
806        assert_eq!(deltas[4].order.size, Quantity::new(0.5, 1));
807
808        assert_eq!(
809            deltas[0].order.price.precision,
810            deltas[4].order.price.precision
811        );
812        assert_eq!(
813            deltas[0].order.size.precision,
814            deltas[2].order.size.precision
815        );
816
817        std::fs::remove_file(&temp_file).ok();
818    }
819
820    #[rstest]
821    #[case(Some(1), Some(0))] // Explicit precisions
822    #[case(None, None)] // Inferred precisions
823    pub fn test_read_deltas(
824        #[case] price_precision: Option<u8>,
825        #[case] size_precision: Option<u8>,
826    ) {
827        let filepath = get_tardis_deribit_book_l2_path();
828        let deltas =
829            load_deltas(filepath, price_precision, size_precision, None, Some(100)).unwrap();
830
831        assert_eq!(deltas.len(), 15);
832        assert_eq!(
833            deltas[0].instrument_id,
834            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
835        );
836        assert_eq!(deltas[0].action, BookAction::Add);
837        assert_eq!(deltas[0].order.side, OrderSide::Sell);
838        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
839        assert_eq!(deltas[0].order.size, Quantity::from("18640"));
840        assert_eq!(deltas[0].flags, 0);
841        assert_eq!(deltas[0].sequence, 0);
842        assert_eq!(deltas[0].ts_event, 1585699200245000000);
843        assert_eq!(deltas[0].ts_init, 1585699200355684000);
844    }
845
846    #[rstest]
847    #[case(Some(2), Some(3))] // Explicit precisions
848    #[case(None, None)] // Inferred precisions
849    pub fn test_read_depth10s_from_snapshot5(
850        #[case] price_precision: Option<u8>,
851        #[case] size_precision: Option<u8>,
852    ) {
853        let filepath = get_tardis_binance_snapshot5_path();
854        let depths =
855            load_depth10_from_snapshot5(filepath, price_precision, size_precision, None, Some(100))
856                .unwrap();
857
858        assert_eq!(depths.len(), 10);
859        assert_eq!(
860            depths[0].instrument_id,
861            InstrumentId::from("BTCUSDT.BINANCE")
862        );
863        assert_eq!(depths[0].bids.len(), 10);
864        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
865        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
866        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
867        assert_eq!(depths[0].bids[0].order_id, 0);
868        assert_eq!(depths[0].asks.len(), 10);
869        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
870        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
871        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
872        assert_eq!(depths[0].asks[0].order_id, 0);
873        assert_eq!(depths[0].bid_counts[0], 1);
874        assert_eq!(depths[0].ask_counts[0], 1);
875        assert_eq!(depths[0].flags, 128);
876        assert_eq!(depths[0].ts_event, 1598918403696000000);
877        assert_eq!(depths[0].ts_init, 1598918403810979000);
878        assert_eq!(depths[0].sequence, 0);
879    }
880
881    #[rstest]
882    #[case(Some(2), Some(3))] // Explicit precisions
883    #[case(None, None)] // Inferred precisions
884    pub fn test_read_depth10s_from_snapshot25(
885        #[case] price_precision: Option<u8>,
886        #[case] size_precision: Option<u8>,
887    ) {
888        let filepath = get_tardis_binance_snapshot25_path();
889        let depths = load_depth10_from_snapshot25(
890            filepath,
891            price_precision,
892            size_precision,
893            None,
894            Some(100),
895        )
896        .unwrap();
897
898        assert_eq!(depths.len(), 10);
899        assert_eq!(
900            depths[0].instrument_id,
901            InstrumentId::from("BTCUSDT.BINANCE")
902        );
903        assert_eq!(depths[0].bids.len(), 10);
904        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
905        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
906        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
907        assert_eq!(depths[0].bids[0].order_id, 0);
908        assert_eq!(depths[0].asks.len(), 10);
909        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
910        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
911        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
912        assert_eq!(depths[0].asks[0].order_id, 0);
913        assert_eq!(depths[0].bid_counts[0], 1);
914        assert_eq!(depths[0].ask_counts[0], 1);
915        assert_eq!(depths[0].flags, 128);
916        assert_eq!(depths[0].ts_event, 1598918403696000000);
917        assert_eq!(depths[0].ts_init, 1598918403810979000);
918        assert_eq!(depths[0].sequence, 0);
919    }
920
921    #[rstest]
922    #[case(Some(1), Some(0))] // Explicit precisions
923    #[case(None, None)] // Inferred precisions
924    pub fn test_read_quotes(
925        #[case] price_precision: Option<u8>,
926        #[case] size_precision: Option<u8>,
927    ) {
928        let filepath = get_tardis_huobi_quotes_path();
929        let quotes =
930            load_quotes(filepath, price_precision, size_precision, None, Some(100)).unwrap();
931
932        assert_eq!(quotes.len(), 10);
933        assert_eq!(
934            quotes[0].instrument_id,
935            InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
936        );
937        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
938        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
939        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
940        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
941        assert_eq!(quotes[0].ts_event, 1588291201099000000);
942        assert_eq!(quotes[0].ts_init, 1588291201234268000);
943    }
944
945    #[rstest]
946    #[case(Some(1), Some(0))] // Explicit precisions
947    #[case(None, None)] // Inferred precisions
948    pub fn test_read_trades(
949        #[case] price_precision: Option<u8>,
950        #[case] size_precision: Option<u8>,
951    ) {
952        let filepath = get_tardis_bitmex_trades_path();
953        let trades =
954            load_trades(filepath, price_precision, size_precision, None, Some(100)).unwrap();
955
956        assert_eq!(trades.len(), 10);
957        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
958        assert_eq!(trades[0].price, Price::from("8531.5"));
959        assert_eq!(trades[0].size, Quantity::from("2152"));
960        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
961        assert_eq!(
962            trades[0].trade_id,
963            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
964        );
965        assert_eq!(trades[0].ts_event, 1583020803145000000);
966        assert_eq!(trades[0].ts_init, 1583020803307160000);
967    }
968
969    #[rstest]
970    pub fn test_load_trades_with_zero_sized_trade() {
971        // Create test CSV data with one zero-sized trade that should be skipped
972        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
973binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
974binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
975binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
976binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
977
978        let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
979        std::fs::write(&temp_file, csv_data).unwrap();
980
981        let trades = load_trades(
982            &temp_file,
983            Some(4),
984            Some(1),
985            None,
986            None, // No limit, load all
987        )
988        .unwrap();
989
990        // Should have 3 trades (zero-sized trade skipped)
991        assert_eq!(trades.len(), 3);
992
993        // Verify the correct trades were loaded (not the zero-sized one)
994        assert_eq!(trades[0].size, Quantity::from("1.0"));
995        assert_eq!(trades[1].size, Quantity::from("1.5"));
996        assert_eq!(trades[2].size, Quantity::from("3.0"));
997
998        // Verify trade IDs to confirm correct trades were loaded
999        assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1000        assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1001        assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1002
1003        std::fs::remove_file(&temp_file).ok();
1004    }
1005
1006    #[rstest]
1007    pub fn test_load_trades_from_local_file() {
1008        let filepath = get_test_data_path("csv/trades_1.csv");
1009        let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1010        assert_eq!(trades.len(), 2);
1011        assert_eq!(trades[0].price, Price::from("8531.5"));
1012        assert_eq!(trades[1].size, Quantity::from("1000"));
1013    }
1014
1015    #[rstest]
1016    pub fn test_load_deltas_from_local_file() {
1017        let filepath = get_test_data_path("csv/deltas_1.csv");
1018        let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1019        assert_eq!(deltas.len(), 2);
1020        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
1021        assert_eq!(deltas[1].order.size, Quantity::from("10000"));
1022    }
1023
1024    #[rstest]
1025    fn test_load_depth10_from_snapshot5_comprehensive() {
1026        let filepath = get_tardis_binance_snapshot5_path();
1027        let depths = load_depth10_from_snapshot5(&filepath, None, None, None, Some(100)).unwrap();
1028
1029        assert_eq!(depths.len(), 10);
1030
1031        let first = &depths[0];
1032        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1033        assert_eq!(first.bids.len(), 10);
1034        assert_eq!(first.asks.len(), 10);
1035
1036        // Check all bid levels (first 5 from data, rest empty)
1037        assert_eq!(first.bids[0].price, Price::from("11657.07"));
1038        assert_eq!(first.bids[0].size, Quantity::from("10.896"));
1039        assert_eq!(first.bids[0].side, OrderSide::Buy);
1040
1041        assert_eq!(first.bids[1].price, Price::from("11656.97"));
1042        assert_eq!(first.bids[1].size, Quantity::from("0.2"));
1043        assert_eq!(first.bids[1].side, OrderSide::Buy);
1044
1045        assert_eq!(first.bids[2].price, Price::from("11655.78"));
1046        assert_eq!(first.bids[2].size, Quantity::from("0.2"));
1047        assert_eq!(first.bids[2].side, OrderSide::Buy);
1048
1049        assert_eq!(first.bids[3].price, Price::from("11655.77"));
1050        assert_eq!(first.bids[3].size, Quantity::from("0.98"));
1051        assert_eq!(first.bids[3].side, OrderSide::Buy);
1052
1053        assert_eq!(first.bids[4].price, Price::from("11655.68"));
1054        assert_eq!(first.bids[4].size, Quantity::from("0.111"));
1055        assert_eq!(first.bids[4].side, OrderSide::Buy);
1056
1057        // Empty levels
1058        for i in 5..10 {
1059            assert_eq!(first.bids[i].price.raw, 0);
1060            assert_eq!(first.bids[i].size.raw, 0);
1061            assert_eq!(first.bids[i].side, OrderSide::NoOrderSide);
1062        }
1063
1064        // Check all ask levels (first 5 from data, rest empty)
1065        assert_eq!(first.asks[0].price, Price::from("11657.08"));
1066        assert_eq!(first.asks[0].size, Quantity::from("1.714"));
1067        assert_eq!(first.asks[0].side, OrderSide::Sell);
1068
1069        assert_eq!(first.asks[1].price, Price::from("11657.54"));
1070        assert_eq!(first.asks[1].size, Quantity::from("5.4"));
1071        assert_eq!(first.asks[1].side, OrderSide::Sell);
1072
1073        assert_eq!(first.asks[2].price, Price::from("11657.56"));
1074        assert_eq!(first.asks[2].size, Quantity::from("0.238"));
1075        assert_eq!(first.asks[2].side, OrderSide::Sell);
1076
1077        assert_eq!(first.asks[3].price, Price::from("11657.61"));
1078        assert_eq!(first.asks[3].size, Quantity::from("0.077"));
1079        assert_eq!(first.asks[3].side, OrderSide::Sell);
1080
1081        assert_eq!(first.asks[4].price, Price::from("11657.92"));
1082        assert_eq!(first.asks[4].size, Quantity::from("0.918"));
1083        assert_eq!(first.asks[4].side, OrderSide::Sell);
1084
1085        // Empty levels
1086        for i in 5..10 {
1087            assert_eq!(first.asks[i].price.raw, 0);
1088            assert_eq!(first.asks[i].size.raw, 0);
1089            assert_eq!(first.asks[i].side, OrderSide::NoOrderSide);
1090        }
1091
1092        // Logical checks: bid prices should decrease
1093        for i in 1..5 {
1094            assert!(
1095                first.bids[i].price < first.bids[i - 1].price,
1096                "Bid price at level {} should be less than level {}",
1097                i,
1098                i - 1
1099            );
1100        }
1101
1102        // Logical checks: ask prices should increase
1103        for i in 1..5 {
1104            assert!(
1105                first.asks[i].price > first.asks[i - 1].price,
1106                "Ask price at level {} should be greater than level {}",
1107                i,
1108                i - 1
1109            );
1110        }
1111
1112        // Logical check: spread should be positive
1113        assert!(
1114            first.asks[0].price > first.bids[0].price,
1115            "Best ask should be greater than best bid"
1116        );
1117
1118        // Check counts
1119        for i in 0..5 {
1120            assert_eq!(first.bid_counts[i], 1);
1121            assert_eq!(first.ask_counts[i], 1);
1122        }
1123        for i in 5..10 {
1124            assert_eq!(first.bid_counts[i], 0);
1125            assert_eq!(first.ask_counts[i], 0);
1126        }
1127
1128        // Check metadata
1129        assert_eq!(first.flags, 128); // F_SNAPSHOT flag
1130        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1131        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1132        assert_eq!(first.sequence, 0);
1133    }
1134
1135    #[rstest]
1136    fn test_load_depth10_from_snapshot25_comprehensive() {
1137        let filepath = get_tardis_binance_snapshot25_path();
1138        let depths = load_depth10_from_snapshot25(&filepath, None, None, None, Some(100)).unwrap();
1139
1140        assert_eq!(depths.len(), 10);
1141
1142        let first = &depths[0];
1143        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1144        assert_eq!(first.bids.len(), 10);
1145        assert_eq!(first.asks.len(), 10);
1146
1147        // Check all 10 bid levels from snapshot25
1148        let expected_bids = vec![
1149            ("11657.07", "10.896"),
1150            ("11656.97", "0.2"),
1151            ("11655.78", "0.2"),
1152            ("11655.77", "0.98"),
1153            ("11655.68", "0.111"),
1154            ("11655.66", "0.077"),
1155            ("11655.57", "0.34"),
1156            ("11655.48", "0.4"),
1157            ("11655.26", "1.185"),
1158            ("11654.86", "0.195"),
1159        ];
1160
1161        for (i, (price, size)) in expected_bids.iter().enumerate() {
1162            assert_eq!(first.bids[i].price, Price::from(*price));
1163            assert_eq!(first.bids[i].size, Quantity::from(*size));
1164            assert_eq!(first.bids[i].side, OrderSide::Buy);
1165        }
1166
1167        // Check all 10 ask levels from snapshot25
1168        let expected_asks = vec![
1169            ("11657.08", "1.714"),
1170            ("11657.54", "5.4"),
1171            ("11657.56", "0.238"),
1172            ("11657.61", "0.077"),
1173            ("11657.92", "0.918"),
1174            ("11658.09", "1.015"),
1175            ("11658.12", "0.665"),
1176            ("11658.19", "0.583"),
1177            ("11658.28", "0.255"),
1178            ("11658.29", "0.656"),
1179        ];
1180
1181        for (i, (price, size)) in expected_asks.iter().enumerate() {
1182            assert_eq!(first.asks[i].price, Price::from(*price));
1183            assert_eq!(first.asks[i].size, Quantity::from(*size));
1184            assert_eq!(first.asks[i].side, OrderSide::Sell);
1185        }
1186
1187        // Logical checks: bid prices should strictly decrease
1188        for i in 1..10 {
1189            assert!(
1190                first.bids[i].price < first.bids[i - 1].price,
1191                "Bid price at level {} ({}) should be less than level {} ({})",
1192                i,
1193                first.bids[i].price,
1194                i - 1,
1195                first.bids[i - 1].price
1196            );
1197        }
1198
1199        // Logical checks: ask prices should strictly increase
1200        for i in 1..10 {
1201            assert!(
1202                first.asks[i].price > first.asks[i - 1].price,
1203                "Ask price at level {} ({}) should be greater than level {} ({})",
1204                i,
1205                first.asks[i].price,
1206                i - 1,
1207                first.asks[i - 1].price
1208            );
1209        }
1210
1211        // Logical check: spread should be positive
1212        assert!(
1213            first.asks[0].price > first.bids[0].price,
1214            "Best ask ({}) should be greater than best bid ({})",
1215            first.asks[0].price,
1216            first.bids[0].price
1217        );
1218
1219        // Check counts (all should be 1 for snapshot data)
1220        for i in 0..10 {
1221            assert_eq!(first.bid_counts[i], 1);
1222            assert_eq!(first.ask_counts[i], 1);
1223        }
1224
1225        // Check metadata
1226        assert_eq!(first.flags, 128); // F_SNAPSHOT flag
1227        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1228        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1229        assert_eq!(first.sequence, 0);
1230    }
1231
1232    #[rstest]
1233    fn test_snapshot_csv_field_order_interleaved() {
1234        // This test verifies that the CSV structs correctly handle the interleaved
1235        // asks/bids field ordering from Tardis CSV files
1236
1237        let csv_data = "exchange,symbol,timestamp,local_timestamp,\
1238asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,\
1239asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,\
1240asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,\
1241asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,\
1242asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1243binance-futures,BTCUSDT,1000000,2000000,\
1244100.5,1.0,100.4,2.0,\
1245100.6,1.1,100.3,2.1,\
1246100.7,1.2,100.2,2.2,\
1247100.8,1.3,100.1,2.3,\
1248100.9,1.4,100.0,2.4";
1249
1250        let temp_file = std::env::temp_dir().join("test_interleaved_snapshot5.csv");
1251        std::fs::write(&temp_file, csv_data).unwrap();
1252
1253        let depths = load_depth10_from_snapshot5(&temp_file, None, None, None, Some(1)).unwrap();
1254        assert_eq!(depths.len(), 1);
1255
1256        let depth = &depths[0];
1257
1258        // Verify bids are correctly parsed (should be decreasing)
1259        assert_eq!(depth.bids[0].price, Price::from("100.4"));
1260        assert_eq!(depth.bids[1].price, Price::from("100.3"));
1261        assert_eq!(depth.bids[2].price, Price::from("100.2"));
1262        assert_eq!(depth.bids[3].price, Price::from("100.1"));
1263        assert_eq!(depth.bids[4].price, Price::from("100.0"));
1264
1265        // Verify asks are correctly parsed (should be increasing)
1266        assert_eq!(depth.asks[0].price, Price::from("100.5"));
1267        assert_eq!(depth.asks[1].price, Price::from("100.6"));
1268        assert_eq!(depth.asks[2].price, Price::from("100.7"));
1269        assert_eq!(depth.asks[3].price, Price::from("100.8"));
1270        assert_eq!(depth.asks[4].price, Price::from("100.9"));
1271
1272        // Verify sizes
1273        assert_eq!(depth.bids[0].size, Quantity::from("2.0"));
1274        assert_eq!(depth.asks[0].size, Quantity::from("1.0"));
1275
1276        std::fs::remove_file(temp_file).unwrap();
1277    }
1278}