nautilus_tardis/csv/
load.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{
22        DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick,
24    },
25    enums::{OrderSide, RecordFlag},
26    identifiers::InstrumentId,
27    types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31    csv::{
32        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
33        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
34        record::{
35            TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
36            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
37        },
38    },
39    parse::{parse_instrument_id, parse_timestamp},
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43    if explicit.is_some() {
44        return false;
45    }
46
47    let inferred = infer_precision(value).min(FIXED_PRECISION);
48    if inferred > *current {
49        *current = inferred;
50        true
51    } else {
52        false
53    }
54}
55
56fn update_deltas_precision(
57    deltas: &mut [OrderBookDelta],
58    price_precision: Option<u8>,
59    size_precision: Option<u8>,
60    current_price_precision: u8,
61    current_size_precision: u8,
62) {
63    for delta in deltas {
64        if price_precision.is_none() {
65            delta.order.price.precision = current_price_precision;
66        }
67        if size_precision.is_none() {
68            delta.order.size.precision = current_size_precision;
69        }
70    }
71}
72
73fn update_quotes_precision(
74    quotes: &mut [QuoteTick],
75    price_precision: Option<u8>,
76    size_precision: Option<u8>,
77    current_price_precision: u8,
78    current_size_precision: u8,
79) {
80    for quote in quotes {
81        if price_precision.is_none() {
82            quote.bid_price.precision = current_price_precision;
83            quote.ask_price.precision = current_price_precision;
84        }
85        if size_precision.is_none() {
86            quote.bid_size.precision = current_size_precision;
87            quote.ask_size.precision = current_size_precision;
88        }
89    }
90}
91
92fn update_trades_precision(
93    trades: &mut [TradeTick],
94    price_precision: Option<u8>,
95    size_precision: Option<u8>,
96    current_price_precision: u8,
97    current_size_precision: u8,
98) {
99    for trade in trades {
100        if price_precision.is_none() {
101            trade.price.precision = current_price_precision;
102        }
103        if size_precision.is_none() {
104            trade.size.precision = current_size_precision;
105        }
106    }
107}
108
109/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
110/// automatically applying `GZip` decompression for files ending in ".gz".
111/// Load order book delta records from a CSV or gzipped CSV file.
112///
113/// # Errors
114///
115/// Returns an error if the file cannot be opened, read, or parsed as CSV.
116///
117/// # Panics
118///
119/// Panics if a CSV record has a zero size for a non-delete action or if data conversion fails.
120pub fn load_deltas<P: AsRef<Path>>(
121    filepath: P,
122    price_precision: Option<u8>,
123    size_precision: Option<u8>,
124    instrument_id: Option<InstrumentId>,
125    limit: Option<usize>,
126) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
127    // Estimate capacity for Vec pre-allocation
128    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
129    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
130
131    let mut current_price_precision = price_precision.unwrap_or(0);
132    let mut current_size_precision = size_precision.unwrap_or(0);
133    let mut last_ts_event = UnixNanos::default();
134
135    let mut reader = create_csv_reader(filepath)?;
136    let mut record = StringRecord::new();
137
138    while reader.read_record(&mut record)? {
139        let data: TardisBookUpdateRecord = record.deserialize(None)?;
140
141        // Update precisions dynamically if not explicitly set
142        let price_updated =
143            update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
144        let size_updated =
145            update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
146
147        // If precision increased, update all previous deltas
148        if price_updated || size_updated {
149            update_deltas_precision(
150                &mut deltas,
151                price_precision,
152                size_precision,
153                current_price_precision,
154                current_size_precision,
155            );
156        }
157
158        let delta = parse_delta_record(
159            &data,
160            current_price_precision,
161            current_size_precision,
162            instrument_id,
163        );
164
165        // Check if timestamp is different from last timestamp
166        let ts_event = delta.ts_event;
167        if last_ts_event != ts_event
168            && let Some(last_delta) = deltas.last_mut()
169        {
170            // Set previous delta flags as F_LAST
171            last_delta.flags = RecordFlag::F_LAST.value();
172        }
173
174        last_ts_event = ts_event;
175
176        deltas.push(delta);
177
178        if let Some(limit) = limit
179            && deltas.len() >= limit
180        {
181            break;
182        }
183    }
184
185    // Set F_LAST flag for final delta
186    if let Some(last_delta) = deltas.last_mut() {
187        last_delta.flags = RecordFlag::F_LAST.value();
188    }
189
190    Ok(deltas)
191}
192
193/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
194/// automatically applying `GZip` decompression for files ending in ".gz".
195/// Load order book depth-10 snapshots (5-level) from a CSV or gzipped CSV file.
196///
197/// # Errors
198///
199/// Returns an error if the file cannot be opened, read, or parsed as CSV.
200///
201/// # Panics
202///
203/// Panics if a record level cannot be parsed to depth-10.
204pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
205    filepath: P,
206    price_precision: Option<u8>,
207    size_precision: Option<u8>,
208    instrument_id: Option<InstrumentId>,
209    limit: Option<usize>,
210) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
211    // Estimate capacity for Vec pre-allocation
212    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
213    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
214
215    let mut current_price_precision = price_precision.unwrap_or(0);
216    let mut current_size_precision = size_precision.unwrap_or(0);
217
218    let mut reader = create_csv_reader(filepath)?;
219    let mut record = StringRecord::new();
220
221    while reader.read_record(&mut record)? {
222        let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
223
224        // Update precisions dynamically if not explicitly set
225        let mut precision_updated = false;
226
227        if price_precision.is_none()
228            && let Some(bid_price) = data.bids_0_price
229        {
230            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
231            if inferred_price_precision > current_price_precision {
232                current_price_precision = inferred_price_precision;
233                precision_updated = true;
234            }
235        }
236
237        if size_precision.is_none()
238            && let Some(bid_amount) = data.bids_0_amount
239        {
240            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
241            if inferred_size_precision > current_size_precision {
242                current_size_precision = inferred_size_precision;
243                precision_updated = true;
244            }
245        }
246
247        // If precision increased, update all previous depths
248        if precision_updated {
249            for depth in &mut depths {
250                for i in 0..DEPTH10_LEN {
251                    if price_precision.is_none() {
252                        depth.bids[i].price.precision = current_price_precision;
253                        depth.asks[i].price.precision = current_price_precision;
254                    }
255                    if size_precision.is_none() {
256                        depth.bids[i].size.precision = current_size_precision;
257                        depth.asks[i].size.precision = current_size_precision;
258                    }
259                }
260            }
261        }
262
263        let instrument_id = match &instrument_id {
264            Some(id) => *id,
265            None => parse_instrument_id(&data.exchange, data.symbol),
266        };
267        let flags = RecordFlag::F_LAST.value();
268        let sequence = 0; // Sequence not available
269        let ts_event = parse_timestamp(data.timestamp);
270        let ts_init = parse_timestamp(data.local_timestamp);
271
272        // Initialize empty arrays
273        let mut bids = [NULL_ORDER; DEPTH10_LEN];
274        let mut asks = [NULL_ORDER; DEPTH10_LEN];
275        let mut bid_counts = [0u32; DEPTH10_LEN];
276        let mut ask_counts = [0u32; DEPTH10_LEN];
277
278        for i in 0..=4 {
279            // Create bids
280            let (bid_order, bid_count) = create_book_order(
281                OrderSide::Buy,
282                match i {
283                    0 => data.bids_0_price,
284                    1 => data.bids_1_price,
285                    2 => data.bids_2_price,
286                    3 => data.bids_3_price,
287                    4 => data.bids_4_price,
288                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
289                },
290                match i {
291                    0 => data.bids_0_amount,
292                    1 => data.bids_1_amount,
293                    2 => data.bids_2_amount,
294                    3 => data.bids_3_amount,
295                    4 => data.bids_4_amount,
296                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
297                },
298                current_price_precision,
299                current_size_precision,
300            );
301            bids[i] = bid_order;
302            bid_counts[i] = bid_count;
303
304            // Create asks
305            let (ask_order, ask_count) = create_book_order(
306                OrderSide::Sell,
307                match i {
308                    0 => data.asks_0_price,
309                    1 => data.asks_1_price,
310                    2 => data.asks_2_price,
311                    3 => data.asks_3_price,
312                    4 => data.asks_4_price,
313                    _ => None, // Unreachable, but for safety
314                },
315                match i {
316                    0 => data.asks_0_amount,
317                    1 => data.asks_1_amount,
318                    2 => data.asks_2_amount,
319                    3 => data.asks_3_amount,
320                    4 => data.asks_4_amount,
321                    _ => None, // Unreachable, but for safety
322                },
323                current_price_precision,
324                current_size_precision,
325            );
326            asks[i] = ask_order;
327            ask_counts[i] = ask_count;
328        }
329
330        let depth = OrderBookDepth10::new(
331            instrument_id,
332            bids,
333            asks,
334            bid_counts,
335            ask_counts,
336            flags,
337            sequence,
338            ts_event,
339            ts_init,
340        );
341
342        depths.push(depth);
343
344        if let Some(limit) = limit
345            && depths.len() >= limit
346        {
347            break;
348        }
349    }
350
351    Ok(depths)
352}
353
354/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
355/// automatically applying `GZip` decompression for files ending in ".gz".
356/// Load order book depth-10 snapshots (25-level) from a CSV or gzipped CSV file.
357///
358/// # Errors
359///
360/// Returns an error if the file cannot be opened, read, or parsed as CSV.
361///
362/// # Panics
363///
364/// Panics if a record level cannot be parsed to depth-10.
365pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
366    filepath: P,
367    price_precision: Option<u8>,
368    size_precision: Option<u8>,
369    instrument_id: Option<InstrumentId>,
370    limit: Option<usize>,
371) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
372    // Estimate capacity for Vec pre-allocation
373    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
374    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
375
376    let mut current_price_precision = price_precision.unwrap_or(0);
377    let mut current_size_precision = size_precision.unwrap_or(0);
378    let mut reader = create_csv_reader(filepath)?;
379    let mut record = StringRecord::new();
380
381    while reader.read_record(&mut record)? {
382        let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
383
384        // Update precisions dynamically if not explicitly set
385        let mut precision_updated = false;
386
387        if price_precision.is_none()
388            && let Some(bid_price) = data.bids_0_price
389        {
390            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
391            if inferred_price_precision > current_price_precision {
392                current_price_precision = inferred_price_precision;
393                precision_updated = true;
394            }
395        }
396
397        if size_precision.is_none()
398            && let Some(bid_amount) = data.bids_0_amount
399        {
400            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
401            if inferred_size_precision > current_size_precision {
402                current_size_precision = inferred_size_precision;
403                precision_updated = true;
404            }
405        }
406
407        // If precision increased, update all previous depths
408        if precision_updated {
409            for depth in &mut depths {
410                for i in 0..DEPTH10_LEN {
411                    if price_precision.is_none() {
412                        depth.bids[i].price.precision = current_price_precision;
413                        depth.asks[i].price.precision = current_price_precision;
414                    }
415                    if size_precision.is_none() {
416                        depth.bids[i].size.precision = current_size_precision;
417                        depth.asks[i].size.precision = current_size_precision;
418                    }
419                }
420            }
421        }
422
423        let instrument_id = match &instrument_id {
424            Some(id) => *id,
425            None => parse_instrument_id(&data.exchange, data.symbol),
426        };
427        let flags = RecordFlag::F_LAST.value();
428        let sequence = 0; // Sequence not available
429        let ts_event = parse_timestamp(data.timestamp);
430        let ts_init = parse_timestamp(data.local_timestamp);
431
432        // Initialize empty arrays for the first 10 levels only
433        let mut bids = [NULL_ORDER; DEPTH10_LEN];
434        let mut asks = [NULL_ORDER; DEPTH10_LEN];
435        let mut bid_counts = [0u32; DEPTH10_LEN];
436        let mut ask_counts = [0u32; DEPTH10_LEN];
437
438        // Fill only the first 10 levels from the 25-level record
439        for i in 0..DEPTH10_LEN {
440            // Create bids
441            let (bid_order, bid_count) = create_book_order(
442                OrderSide::Buy,
443                match i {
444                    0 => data.bids_0_price,
445                    1 => data.bids_1_price,
446                    2 => data.bids_2_price,
447                    3 => data.bids_3_price,
448                    4 => data.bids_4_price,
449                    5 => data.bids_5_price,
450                    6 => data.bids_6_price,
451                    7 => data.bids_7_price,
452                    8 => data.bids_8_price,
453                    9 => data.bids_9_price,
454                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
455                },
456                match i {
457                    0 => data.bids_0_amount,
458                    1 => data.bids_1_amount,
459                    2 => data.bids_2_amount,
460                    3 => data.bids_3_amount,
461                    4 => data.bids_4_amount,
462                    5 => data.bids_5_amount,
463                    6 => data.bids_6_amount,
464                    7 => data.bids_7_amount,
465                    8 => data.bids_8_amount,
466                    9 => data.bids_9_amount,
467                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
468                },
469                current_price_precision,
470                current_size_precision,
471            );
472            bids[i] = bid_order;
473            bid_counts[i] = bid_count;
474
475            // Create asks
476            let (ask_order, ask_count) = create_book_order(
477                OrderSide::Sell,
478                match i {
479                    0 => data.asks_0_price,
480                    1 => data.asks_1_price,
481                    2 => data.asks_2_price,
482                    3 => data.asks_3_price,
483                    4 => data.asks_4_price,
484                    5 => data.asks_5_price,
485                    6 => data.asks_6_price,
486                    7 => data.asks_7_price,
487                    8 => data.asks_8_price,
488                    9 => data.asks_9_price,
489                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
490                },
491                match i {
492                    0 => data.asks_0_amount,
493                    1 => data.asks_1_amount,
494                    2 => data.asks_2_amount,
495                    3 => data.asks_3_amount,
496                    4 => data.asks_4_amount,
497                    5 => data.asks_5_amount,
498                    6 => data.asks_6_amount,
499                    7 => data.asks_7_amount,
500                    8 => data.asks_8_amount,
501                    9 => data.asks_9_amount,
502                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
503                },
504                current_price_precision,
505                current_size_precision,
506            );
507            asks[i] = ask_order;
508            ask_counts[i] = ask_count;
509        }
510
511        let depth = OrderBookDepth10::new(
512            instrument_id,
513            bids,
514            asks,
515            bid_counts,
516            ask_counts,
517            flags,
518            sequence,
519            ts_event,
520            ts_init,
521        );
522
523        depths.push(depth);
524
525        if let Some(limit) = limit
526            && depths.len() >= limit
527        {
528            break;
529        }
530    }
531
532    Ok(depths)
533}
534
535/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
536/// automatically applying `GZip` decompression for files ending in ".gz".
537/// Load quote ticks from a CSV or gzipped CSV file.
538///
539/// # Errors
540///
541/// Returns an error if the file cannot be opened, read, or parsed as CSV.
542///
543/// # Panics
544///
545/// Panics if a record has invalid data or CSV parsing errors.
546pub fn load_quotes<P: AsRef<Path>>(
547    filepath: P,
548    price_precision: Option<u8>,
549    size_precision: Option<u8>,
550    instrument_id: Option<InstrumentId>,
551    limit: Option<usize>,
552) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
553    // Estimate capacity for Vec pre-allocation
554    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
555    let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
556
557    let mut current_price_precision = price_precision.unwrap_or(0);
558    let mut current_size_precision = size_precision.unwrap_or(0);
559    let mut reader = create_csv_reader(filepath)?;
560    let mut record = StringRecord::new();
561
562    while reader.read_record(&mut record)? {
563        let data: TardisQuoteRecord = record.deserialize(None)?;
564
565        // Update precisions dynamically if not explicitly set
566        let mut precision_updated = false;
567
568        if price_precision.is_none()
569            && let Some(bid_price) = data.bid_price
570        {
571            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
572            if inferred_price_precision > current_price_precision {
573                current_price_precision = inferred_price_precision;
574                precision_updated = true;
575            }
576        }
577
578        if size_precision.is_none()
579            && let Some(bid_amount) = data.bid_amount
580        {
581            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
582            if inferred_size_precision > current_size_precision {
583                current_size_precision = inferred_size_precision;
584                precision_updated = true;
585            }
586        }
587
588        // If precision increased, update all previous quotes
589        if precision_updated {
590            update_quotes_precision(
591                &mut quotes,
592                price_precision,
593                size_precision,
594                current_price_precision,
595                current_size_precision,
596            );
597        }
598
599        let quote = parse_quote_record(
600            &data,
601            current_price_precision,
602            current_size_precision,
603            instrument_id,
604        );
605
606        quotes.push(quote);
607
608        if let Some(limit) = limit
609            && quotes.len() >= limit
610        {
611            break;
612        }
613    }
614
615    Ok(quotes)
616}
617
618/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
619/// automatically applying `GZip` decompression for files ending in ".gz".
620/// Load trade ticks from a CSV or gzipped CSV file.
621///
622/// # Errors
623///
624/// Returns an error if the file cannot be opened, read, or parsed as CSV.
625///
626/// # Panics
627///
628/// Panics if a record has invalid trade size or CSV parsing errors.
629pub fn load_trades<P: AsRef<Path>>(
630    filepath: P,
631    price_precision: Option<u8>,
632    size_precision: Option<u8>,
633    instrument_id: Option<InstrumentId>,
634    limit: Option<usize>,
635) -> Result<Vec<TradeTick>, Box<dyn Error>> {
636    // Estimate capacity for Vec pre-allocation
637    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
638    let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
639
640    let mut current_price_precision = price_precision.unwrap_or(0);
641    let mut current_size_precision = size_precision.unwrap_or(0);
642    let mut reader = create_csv_reader(filepath)?;
643    let mut record = StringRecord::new();
644
645    while reader.read_record(&mut record)? {
646        let data: TardisTradeRecord = record.deserialize(None)?;
647
648        // Update precisions dynamically if not explicitly set
649        let mut precision_updated = false;
650
651        if price_precision.is_none() {
652            let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
653            if inferred_price_precision > current_price_precision {
654                current_price_precision = inferred_price_precision;
655                precision_updated = true;
656            }
657        }
658
659        if size_precision.is_none() {
660            let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
661            if inferred_size_precision > current_size_precision {
662                current_size_precision = inferred_size_precision;
663                precision_updated = true;
664            }
665        }
666
667        // If precision increased, update all previous trades
668        if precision_updated {
669            update_trades_precision(
670                &mut trades,
671                price_precision,
672                size_precision,
673                current_price_precision,
674                current_size_precision,
675            );
676        }
677
678        let size = Quantity::new_checked(data.amount, current_size_precision)?;
679
680        if size.is_positive() {
681            let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
682
683            trades.push(trade);
684
685            if let Some(limit) = limit
686                && trades.len() >= limit
687            {
688                break;
689            }
690        } else {
691            log::warn!("Skipping zero-sized trade: {data:?}");
692        }
693    }
694
695    Ok(trades)
696}
697
698/// Loads [`FundingRateUpdate`]s from a Tardis format derivative ticker CSV at the given `filepath`,
699/// automatically applying `GZip` decompression for files ending in ".gz".
700///
701/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
702/// fields from derivative ticker data to create funding rate updates.
703///
704/// # Errors
705///
706/// Returns an error if the file cannot be opened, read, or parsed as CSV.
707pub fn load_funding_rates<P: AsRef<Path>>(
708    filepath: P,
709    instrument_id: Option<InstrumentId>,
710    limit: Option<usize>,
711) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
712    // Estimate capacity for Vec pre-allocation
713    let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
714    let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
715
716    let mut reader = create_csv_reader(filepath)?;
717    let mut record = StringRecord::new();
718
719    while reader.read_record(&mut record)? {
720        let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
721
722        // Parse to funding rate update (returns None if no funding data)
723        if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
724            funding_rates.push(funding_rate);
725
726            if let Some(limit) = limit
727                && funding_rates.len() >= limit
728            {
729                break;
730            }
731        }
732    }
733
734    Ok(funding_rates)
735}
736
737////////////////////////////////////////////////////////////////////////////////
738// Tests
739////////////////////////////////////////////////////////////////////////////////
740#[cfg(test)]
741mod tests {
742    use nautilus_model::{
743        enums::{AggressorSide, BookAction},
744        identifiers::TradeId,
745        types::Price,
746    };
747    use nautilus_testkit::common::{
748        get_tardis_binance_snapshot5_path, get_tardis_binance_snapshot25_path,
749        get_tardis_bitmex_trades_path, get_tardis_deribit_book_l2_path,
750        get_tardis_huobi_quotes_path,
751    };
752    use rstest::*;
753
754    use super::*;
755    use crate::{parse::parse_price, tests::get_test_data_path};
756
757    #[rstest]
758    #[case(0.0, 0)]
759    #[case(42.0, 0)]
760    #[case(0.1, 1)]
761    #[case(0.25, 2)]
762    #[case(123.0001, 4)]
763    #[case(-42.987654321,       9)]
764    #[case(1.234_567_890_123, 12)]
765    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
766        assert_eq!(infer_precision(input), expected);
767    }
768
769    #[rstest]
770    pub fn test_dynamic_precision_inference() {
771        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
772binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
773binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
774binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
775binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
776binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
777
778        let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
779        std::fs::write(&temp_file, csv_data).unwrap();
780
781        let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
782
783        assert_eq!(deltas.len(), 5);
784
785        for (i, delta) in deltas.iter().enumerate() {
786            assert_eq!(
787                delta.order.price.precision, 4,
788                "Price precision should be 4 for delta {i}",
789            );
790            assert_eq!(
791                delta.order.size.precision, 1,
792                "Size precision should be 1 for delta {i}",
793            );
794        }
795
796        // Test exact values to ensure retroactive precision updates work correctly
797        assert_eq!(deltas[0].order.price, parse_price(50000.0, 4));
798        assert_eq!(deltas[0].order.size, Quantity::new(1.0, 1));
799
800        assert_eq!(deltas[1].order.price, parse_price(49999.5, 4));
801        assert_eq!(deltas[1].order.size, Quantity::new(2.0, 1));
802
803        assert_eq!(deltas[2].order.price, parse_price(50000.12, 4));
804        assert_eq!(deltas[2].order.size, Quantity::new(1.5, 1));
805
806        assert_eq!(deltas[3].order.price, parse_price(49999.123, 4));
807        assert_eq!(deltas[3].order.size, Quantity::new(3.0, 1));
808
809        assert_eq!(deltas[4].order.price, parse_price(50000.1234, 4));
810        assert_eq!(deltas[4].order.size, Quantity::new(0.5, 1));
811
812        assert_eq!(
813            deltas[0].order.price.precision,
814            deltas[4].order.price.precision
815        );
816        assert_eq!(
817            deltas[0].order.size.precision,
818            deltas[2].order.size.precision
819        );
820
821        std::fs::remove_file(&temp_file).ok();
822    }
823
824    #[rstest]
825    #[case(Some(1), Some(0))] // Explicit precisions
826    #[case(None, None)] // Inferred precisions
827    pub fn test_read_deltas(
828        #[case] price_precision: Option<u8>,
829        #[case] size_precision: Option<u8>,
830    ) {
831        let filepath = get_tardis_deribit_book_l2_path();
832        let deltas =
833            load_deltas(filepath, price_precision, size_precision, None, Some(100)).unwrap();
834
835        assert_eq!(deltas.len(), 15);
836        assert_eq!(
837            deltas[0].instrument_id,
838            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
839        );
840        assert_eq!(deltas[0].action, BookAction::Add);
841        assert_eq!(deltas[0].order.side, OrderSide::Sell);
842        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
843        assert_eq!(deltas[0].order.size, Quantity::from("18640"));
844        assert_eq!(deltas[0].flags, 0);
845        assert_eq!(deltas[0].sequence, 0);
846        assert_eq!(deltas[0].ts_event, 1585699200245000000);
847        assert_eq!(deltas[0].ts_init, 1585699200355684000);
848    }
849
850    #[rstest]
851    #[case(Some(2), Some(3))] // Explicit precisions
852    #[case(None, None)] // Inferred precisions
853    pub fn test_read_depth10s_from_snapshot5(
854        #[case] price_precision: Option<u8>,
855        #[case] size_precision: Option<u8>,
856    ) {
857        let filepath = get_tardis_binance_snapshot5_path();
858        let depths =
859            load_depth10_from_snapshot5(filepath, price_precision, size_precision, None, Some(100))
860                .unwrap();
861
862        assert_eq!(depths.len(), 10);
863        assert_eq!(
864            depths[0].instrument_id,
865            InstrumentId::from("BTCUSDT.BINANCE")
866        );
867        assert_eq!(depths[0].bids.len(), 10);
868        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
869        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
870        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
871        assert_eq!(depths[0].bids[0].order_id, 0);
872        assert_eq!(depths[0].asks.len(), 10);
873        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
874        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
875        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
876        assert_eq!(depths[0].asks[0].order_id, 0);
877        assert_eq!(depths[0].bid_counts[0], 1);
878        assert_eq!(depths[0].ask_counts[0], 1);
879        assert_eq!(depths[0].flags, 128);
880        assert_eq!(depths[0].ts_event, 1598918403696000000);
881        assert_eq!(depths[0].ts_init, 1598918403810979000);
882        assert_eq!(depths[0].sequence, 0);
883    }
884
885    #[rstest]
886    #[case(Some(2), Some(3))] // Explicit precisions
887    #[case(None, None)] // Inferred precisions
888    pub fn test_read_depth10s_from_snapshot25(
889        #[case] price_precision: Option<u8>,
890        #[case] size_precision: Option<u8>,
891    ) {
892        let filepath = get_tardis_binance_snapshot25_path();
893        let depths = load_depth10_from_snapshot25(
894            filepath,
895            price_precision,
896            size_precision,
897            None,
898            Some(100),
899        )
900        .unwrap();
901
902        assert_eq!(depths.len(), 10);
903        assert_eq!(
904            depths[0].instrument_id,
905            InstrumentId::from("BTCUSDT.BINANCE")
906        );
907        assert_eq!(depths[0].bids.len(), 10);
908        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
909        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
910        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
911        assert_eq!(depths[0].bids[0].order_id, 0);
912        assert_eq!(depths[0].asks.len(), 10);
913        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
914        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
915        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
916        assert_eq!(depths[0].asks[0].order_id, 0);
917        assert_eq!(depths[0].bid_counts[0], 1);
918        assert_eq!(depths[0].ask_counts[0], 1);
919        assert_eq!(depths[0].flags, 128);
920        assert_eq!(depths[0].ts_event, 1598918403696000000);
921        assert_eq!(depths[0].ts_init, 1598918403810979000);
922        assert_eq!(depths[0].sequence, 0);
923    }
924
925    #[rstest]
926    #[case(Some(1), Some(0))] // Explicit precisions
927    #[case(None, None)] // Inferred precisions
928    pub fn test_read_quotes(
929        #[case] price_precision: Option<u8>,
930        #[case] size_precision: Option<u8>,
931    ) {
932        let filepath = get_tardis_huobi_quotes_path();
933        let quotes =
934            load_quotes(filepath, price_precision, size_precision, None, Some(100)).unwrap();
935
936        assert_eq!(quotes.len(), 10);
937        assert_eq!(
938            quotes[0].instrument_id,
939            InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
940        );
941        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
942        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
943        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
944        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
945        assert_eq!(quotes[0].ts_event, 1588291201099000000);
946        assert_eq!(quotes[0].ts_init, 1588291201234268000);
947    }
948
949    #[rstest]
950    #[case(Some(1), Some(0))] // Explicit precisions
951    #[case(None, None)] // Inferred precisions
952    pub fn test_read_trades(
953        #[case] price_precision: Option<u8>,
954        #[case] size_precision: Option<u8>,
955    ) {
956        let filepath = get_tardis_bitmex_trades_path();
957        let trades =
958            load_trades(filepath, price_precision, size_precision, None, Some(100)).unwrap();
959
960        assert_eq!(trades.len(), 10);
961        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
962        assert_eq!(trades[0].price, Price::from("8531.5"));
963        assert_eq!(trades[0].size, Quantity::from("2152"));
964        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
965        assert_eq!(
966            trades[0].trade_id,
967            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
968        );
969        assert_eq!(trades[0].ts_event, 1583020803145000000);
970        assert_eq!(trades[0].ts_init, 1583020803307160000);
971    }
972
973    #[rstest]
974    pub fn test_load_trades_with_zero_sized_trade() {
975        // Create test CSV data with one zero-sized trade that should be skipped
976        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
977binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
978binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
979binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
980binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
981
982        let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
983        std::fs::write(&temp_file, csv_data).unwrap();
984
985        let trades = load_trades(
986            &temp_file,
987            Some(4),
988            Some(1),
989            None,
990            None, // No limit, load all
991        )
992        .unwrap();
993
994        // Should have 3 trades (zero-sized trade skipped)
995        assert_eq!(trades.len(), 3);
996
997        // Verify the correct trades were loaded (not the zero-sized one)
998        assert_eq!(trades[0].size, Quantity::from("1.0"));
999        assert_eq!(trades[1].size, Quantity::from("1.5"));
1000        assert_eq!(trades[2].size, Quantity::from("3.0"));
1001
1002        // Verify trade IDs to confirm correct trades were loaded
1003        assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1004        assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1005        assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1006
1007        std::fs::remove_file(&temp_file).ok();
1008    }
1009
1010    #[rstest]
1011    pub fn test_load_trades_from_local_file() {
1012        let filepath = get_test_data_path("csv/trades_1.csv");
1013        let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1014        assert_eq!(trades.len(), 2);
1015        assert_eq!(trades[0].price, Price::from("8531.5"));
1016        assert_eq!(trades[1].size, Quantity::from("1000"));
1017    }
1018
1019    #[rstest]
1020    pub fn test_load_deltas_from_local_file() {
1021        let filepath = get_test_data_path("csv/deltas_1.csv");
1022        let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1023        assert_eq!(deltas.len(), 2);
1024        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
1025        assert_eq!(deltas[1].order.size, Quantity::from("10000"));
1026    }
1027
1028    #[rstest]
1029    fn test_load_depth10_from_snapshot5_comprehensive() {
1030        let filepath = get_tardis_binance_snapshot5_path();
1031        let depths = load_depth10_from_snapshot5(&filepath, None, None, None, Some(100)).unwrap();
1032
1033        assert_eq!(depths.len(), 10);
1034
1035        let first = &depths[0];
1036        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1037        assert_eq!(first.bids.len(), 10);
1038        assert_eq!(first.asks.len(), 10);
1039
1040        // Check all bid levels (first 5 from data, rest empty)
1041        assert_eq!(first.bids[0].price, Price::from("11657.07"));
1042        assert_eq!(first.bids[0].size, Quantity::from("10.896"));
1043        assert_eq!(first.bids[0].side, OrderSide::Buy);
1044
1045        assert_eq!(first.bids[1].price, Price::from("11656.97"));
1046        assert_eq!(first.bids[1].size, Quantity::from("0.2"));
1047        assert_eq!(first.bids[1].side, OrderSide::Buy);
1048
1049        assert_eq!(first.bids[2].price, Price::from("11655.78"));
1050        assert_eq!(first.bids[2].size, Quantity::from("0.2"));
1051        assert_eq!(first.bids[2].side, OrderSide::Buy);
1052
1053        assert_eq!(first.bids[3].price, Price::from("11655.77"));
1054        assert_eq!(first.bids[3].size, Quantity::from("0.98"));
1055        assert_eq!(first.bids[3].side, OrderSide::Buy);
1056
1057        assert_eq!(first.bids[4].price, Price::from("11655.68"));
1058        assert_eq!(first.bids[4].size, Quantity::from("0.111"));
1059        assert_eq!(first.bids[4].side, OrderSide::Buy);
1060
1061        // Empty levels
1062        for i in 5..10 {
1063            assert_eq!(first.bids[i].price.raw, 0);
1064            assert_eq!(first.bids[i].size.raw, 0);
1065            assert_eq!(first.bids[i].side, OrderSide::NoOrderSide);
1066        }
1067
1068        // Check all ask levels (first 5 from data, rest empty)
1069        assert_eq!(first.asks[0].price, Price::from("11657.08"));
1070        assert_eq!(first.asks[0].size, Quantity::from("1.714"));
1071        assert_eq!(first.asks[0].side, OrderSide::Sell);
1072
1073        assert_eq!(first.asks[1].price, Price::from("11657.54"));
1074        assert_eq!(first.asks[1].size, Quantity::from("5.4"));
1075        assert_eq!(first.asks[1].side, OrderSide::Sell);
1076
1077        assert_eq!(first.asks[2].price, Price::from("11657.56"));
1078        assert_eq!(first.asks[2].size, Quantity::from("0.238"));
1079        assert_eq!(first.asks[2].side, OrderSide::Sell);
1080
1081        assert_eq!(first.asks[3].price, Price::from("11657.61"));
1082        assert_eq!(first.asks[3].size, Quantity::from("0.077"));
1083        assert_eq!(first.asks[3].side, OrderSide::Sell);
1084
1085        assert_eq!(first.asks[4].price, Price::from("11657.92"));
1086        assert_eq!(first.asks[4].size, Quantity::from("0.918"));
1087        assert_eq!(first.asks[4].side, OrderSide::Sell);
1088
1089        // Empty levels
1090        for i in 5..10 {
1091            assert_eq!(first.asks[i].price.raw, 0);
1092            assert_eq!(first.asks[i].size.raw, 0);
1093            assert_eq!(first.asks[i].side, OrderSide::NoOrderSide);
1094        }
1095
1096        // Logical checks: bid prices should decrease
1097        for i in 1..5 {
1098            assert!(
1099                first.bids[i].price < first.bids[i - 1].price,
1100                "Bid price at level {} should be less than level {}",
1101                i,
1102                i - 1
1103            );
1104        }
1105
1106        // Logical checks: ask prices should increase
1107        for i in 1..5 {
1108            assert!(
1109                first.asks[i].price > first.asks[i - 1].price,
1110                "Ask price at level {} should be greater than level {}",
1111                i,
1112                i - 1
1113            );
1114        }
1115
1116        // Logical check: spread should be positive
1117        assert!(
1118            first.asks[0].price > first.bids[0].price,
1119            "Best ask should be greater than best bid"
1120        );
1121
1122        // Check counts
1123        for i in 0..5 {
1124            assert_eq!(first.bid_counts[i], 1);
1125            assert_eq!(first.ask_counts[i], 1);
1126        }
1127        for i in 5..10 {
1128            assert_eq!(first.bid_counts[i], 0);
1129            assert_eq!(first.ask_counts[i], 0);
1130        }
1131
1132        // Check metadata
1133        assert_eq!(first.flags, 128); // F_SNAPSHOT flag
1134        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1135        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1136        assert_eq!(first.sequence, 0);
1137    }
1138
1139    #[rstest]
1140    fn test_load_depth10_from_snapshot25_comprehensive() {
1141        let filepath = get_tardis_binance_snapshot25_path();
1142        let depths = load_depth10_from_snapshot25(&filepath, None, None, None, Some(100)).unwrap();
1143
1144        assert_eq!(depths.len(), 10);
1145
1146        let first = &depths[0];
1147        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1148        assert_eq!(first.bids.len(), 10);
1149        assert_eq!(first.asks.len(), 10);
1150
1151        // Check all 10 bid levels from snapshot25
1152        let expected_bids = vec![
1153            ("11657.07", "10.896"),
1154            ("11656.97", "0.2"),
1155            ("11655.78", "0.2"),
1156            ("11655.77", "0.98"),
1157            ("11655.68", "0.111"),
1158            ("11655.66", "0.077"),
1159            ("11655.57", "0.34"),
1160            ("11655.48", "0.4"),
1161            ("11655.26", "1.185"),
1162            ("11654.86", "0.195"),
1163        ];
1164
1165        for (i, (price, size)) in expected_bids.iter().enumerate() {
1166            assert_eq!(first.bids[i].price, Price::from(*price));
1167            assert_eq!(first.bids[i].size, Quantity::from(*size));
1168            assert_eq!(first.bids[i].side, OrderSide::Buy);
1169        }
1170
1171        // Check all 10 ask levels from snapshot25
1172        let expected_asks = vec![
1173            ("11657.08", "1.714"),
1174            ("11657.54", "5.4"),
1175            ("11657.56", "0.238"),
1176            ("11657.61", "0.077"),
1177            ("11657.92", "0.918"),
1178            ("11658.09", "1.015"),
1179            ("11658.12", "0.665"),
1180            ("11658.19", "0.583"),
1181            ("11658.28", "0.255"),
1182            ("11658.29", "0.656"),
1183        ];
1184
1185        for (i, (price, size)) in expected_asks.iter().enumerate() {
1186            assert_eq!(first.asks[i].price, Price::from(*price));
1187            assert_eq!(first.asks[i].size, Quantity::from(*size));
1188            assert_eq!(first.asks[i].side, OrderSide::Sell);
1189        }
1190
1191        // Logical checks: bid prices should strictly decrease
1192        for i in 1..10 {
1193            assert!(
1194                first.bids[i].price < first.bids[i - 1].price,
1195                "Bid price at level {} ({}) should be less than level {} ({})",
1196                i,
1197                first.bids[i].price,
1198                i - 1,
1199                first.bids[i - 1].price
1200            );
1201        }
1202
1203        // Logical checks: ask prices should strictly increase
1204        for i in 1..10 {
1205            assert!(
1206                first.asks[i].price > first.asks[i - 1].price,
1207                "Ask price at level {} ({}) should be greater than level {} ({})",
1208                i,
1209                first.asks[i].price,
1210                i - 1,
1211                first.asks[i - 1].price
1212            );
1213        }
1214
1215        // Logical check: spread should be positive
1216        assert!(
1217            first.asks[0].price > first.bids[0].price,
1218            "Best ask ({}) should be greater than best bid ({})",
1219            first.asks[0].price,
1220            first.bids[0].price
1221        );
1222
1223        // Check counts (all should be 1 for snapshot data)
1224        for i in 0..10 {
1225            assert_eq!(first.bid_counts[i], 1);
1226            assert_eq!(first.ask_counts[i], 1);
1227        }
1228
1229        // Check metadata
1230        assert_eq!(first.flags, 128); // F_SNAPSHOT flag
1231        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1232        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1233        assert_eq!(first.sequence, 0);
1234    }
1235
1236    #[rstest]
1237    fn test_snapshot_csv_field_order_interleaved() {
1238        // This test verifies that the CSV structs correctly handle the interleaved
1239        // asks/bids field ordering from Tardis CSV files
1240
1241        let csv_data = "exchange,symbol,timestamp,local_timestamp,\
1242asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,\
1243asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,\
1244asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,\
1245asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,\
1246asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1247binance-futures,BTCUSDT,1000000,2000000,\
1248100.5,1.0,100.4,2.0,\
1249100.6,1.1,100.3,2.1,\
1250100.7,1.2,100.2,2.2,\
1251100.8,1.3,100.1,2.3,\
1252100.9,1.4,100.0,2.4";
1253
1254        let temp_file = std::env::temp_dir().join("test_interleaved_snapshot5.csv");
1255        std::fs::write(&temp_file, csv_data).unwrap();
1256
1257        let depths = load_depth10_from_snapshot5(&temp_file, None, None, None, Some(1)).unwrap();
1258        assert_eq!(depths.len(), 1);
1259
1260        let depth = &depths[0];
1261
1262        // Verify bids are correctly parsed (should be decreasing)
1263        assert_eq!(depth.bids[0].price, Price::from("100.4"));
1264        assert_eq!(depth.bids[1].price, Price::from("100.3"));
1265        assert_eq!(depth.bids[2].price, Price::from("100.2"));
1266        assert_eq!(depth.bids[3].price, Price::from("100.1"));
1267        assert_eq!(depth.bids[4].price, Price::from("100.0"));
1268
1269        // Verify asks are correctly parsed (should be increasing)
1270        assert_eq!(depth.asks[0].price, Price::from("100.5"));
1271        assert_eq!(depth.asks[1].price, Price::from("100.6"));
1272        assert_eq!(depth.asks[2].price, Price::from("100.7"));
1273        assert_eq!(depth.asks[3].price, Price::from("100.8"));
1274        assert_eq!(depth.asks[4].price, Price::from("100.9"));
1275
1276        // Verify sizes
1277        assert_eq!(depth.bids[0].size, Quantity::from("2.0"));
1278        assert_eq!(depth.asks[0].size, Quantity::from("1.0"));
1279
1280        std::fs::remove_file(temp_file).unwrap();
1281    }
1282}