nautilus_tardis/csv/
load.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{
22        DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick,
24    },
25    enums::{OrderSide, RecordFlag},
26    identifiers::InstrumentId,
27    types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31    csv::{
32        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
33        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
34        record::{
35            TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
36            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
37        },
38    },
39    parse::{parse_instrument_id, parse_timestamp},
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43    if explicit.is_some() {
44        return false;
45    }
46
47    let inferred = infer_precision(value).min(FIXED_PRECISION);
48    if inferred > *current {
49        *current = inferred;
50        true
51    } else {
52        false
53    }
54}
55
56fn update_deltas_precision(
57    deltas: &mut [OrderBookDelta],
58    price_precision: Option<u8>,
59    size_precision: Option<u8>,
60    current_price_precision: u8,
61    current_size_precision: u8,
62) {
63    for delta in deltas {
64        if price_precision.is_none() {
65            delta.order.price.precision = current_price_precision;
66        }
67        if size_precision.is_none() {
68            delta.order.size.precision = current_size_precision;
69        }
70    }
71}
72
73fn update_quotes_precision(
74    quotes: &mut [QuoteTick],
75    price_precision: Option<u8>,
76    size_precision: Option<u8>,
77    current_price_precision: u8,
78    current_size_precision: u8,
79) {
80    for quote in quotes {
81        if price_precision.is_none() {
82            quote.bid_price.precision = current_price_precision;
83            quote.ask_price.precision = current_price_precision;
84        }
85        if size_precision.is_none() {
86            quote.bid_size.precision = current_size_precision;
87            quote.ask_size.precision = current_size_precision;
88        }
89    }
90}
91
92fn update_trades_precision(
93    trades: &mut [TradeTick],
94    price_precision: Option<u8>,
95    size_precision: Option<u8>,
96    current_price_precision: u8,
97    current_size_precision: u8,
98) {
99    for trade in trades {
100        if price_precision.is_none() {
101            trade.price.precision = current_price_precision;
102        }
103        if size_precision.is_none() {
104            trade.size.precision = current_size_precision;
105        }
106    }
107}
108
109/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
110/// automatically applying `GZip` decompression for files ending in ".gz".
111/// Load order book delta records from a CSV or gzipped CSV file.
112///
113/// # Errors
114///
115/// Returns an error if the file cannot be opened, read, or parsed as CSV.
116///
117/// # Panics
118///
119/// Panics if a CSV record has a zero size for a non-delete action or if data conversion fails.
120pub fn load_deltas<P: AsRef<Path>>(
121    filepath: P,
122    price_precision: Option<u8>,
123    size_precision: Option<u8>,
124    instrument_id: Option<InstrumentId>,
125    limit: Option<usize>,
126) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
127    // Estimate capacity for Vec pre-allocation
128    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
129    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
130
131    let mut current_price_precision = price_precision.unwrap_or(0);
132    let mut current_size_precision = size_precision.unwrap_or(0);
133    let mut last_ts_event = UnixNanos::default();
134    let mut last_is_snapshot = false;
135
136    let mut reader = create_csv_reader(filepath)?;
137    let mut record = StringRecord::new();
138
139    while reader.read_record(&mut record)? {
140        if let Some(limit) = limit
141            && deltas.len() >= limit
142        {
143            break;
144        }
145
146        let data: TardisBookUpdateRecord = record.deserialize(None)?;
147
148        update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
149        update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
150
151        // Insert CLEAR on snapshot boundary to reset order book state
152        if data.is_snapshot && !last_is_snapshot {
153            let clear_instrument_id =
154                instrument_id.unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
155            let ts_event = parse_timestamp(data.timestamp);
156            let ts_init = parse_timestamp(data.local_timestamp);
157
158            if last_ts_event != ts_event
159                && let Some(last_delta) = deltas.last_mut()
160            {
161                last_delta.flags = RecordFlag::F_LAST.value();
162            }
163            last_ts_event = ts_event;
164
165            let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
166            deltas.push(clear_delta);
167
168            if let Some(limit) = limit
169                && deltas.len() >= limit
170            {
171                break;
172            }
173        }
174        last_is_snapshot = data.is_snapshot;
175
176        let delta = match parse_delta_record(
177            &data,
178            current_price_precision,
179            current_size_precision,
180            instrument_id,
181        ) {
182            Ok(d) => d,
183            Err(e) => {
184                tracing::warn!("Skipping invalid delta record: {e}");
185                continue;
186            }
187        };
188
189        let ts_event = delta.ts_event;
190        if last_ts_event != ts_event
191            && let Some(last_delta) = deltas.last_mut()
192        {
193            last_delta.flags = RecordFlag::F_LAST.value();
194        }
195
196        last_ts_event = ts_event;
197
198        deltas.push(delta);
199    }
200
201    // Set F_LAST flag for final delta
202    if let Some(last_delta) = deltas.last_mut() {
203        last_delta.flags = RecordFlag::F_LAST.value();
204    }
205
206    // Update all deltas to use the final (maximum) precision discovered
207    // This is done once at the end instead of on every precision change (O(n) vs O(n²))
208    update_deltas_precision(
209        &mut deltas,
210        price_precision,
211        size_precision,
212        current_price_precision,
213        current_size_precision,
214    );
215
216    Ok(deltas)
217}
218
219/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
220/// automatically applying `GZip` decompression for files ending in ".gz".
221/// Load order book depth-10 snapshots (5-level) from a CSV or gzipped CSV file.
222///
223/// # Errors
224///
225/// Returns an error if the file cannot be opened, read, or parsed as CSV.
226///
227/// # Panics
228///
229/// Panics if a record level cannot be parsed to depth-10.
230pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
231    filepath: P,
232    price_precision: Option<u8>,
233    size_precision: Option<u8>,
234    instrument_id: Option<InstrumentId>,
235    limit: Option<usize>,
236) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
237    // Estimate capacity for Vec pre-allocation
238    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
239    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
240
241    let mut current_price_precision = price_precision.unwrap_or(0);
242    let mut current_size_precision = size_precision.unwrap_or(0);
243
244    let mut reader = create_csv_reader(filepath)?;
245    let mut record = StringRecord::new();
246
247    while reader.read_record(&mut record)? {
248        let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
249
250        // Update precisions dynamically if not explicitly set
251        let mut precision_updated = false;
252
253        if price_precision.is_none()
254            && let Some(bid_price) = data.bids_0_price
255        {
256            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
257            if inferred_price_precision > current_price_precision {
258                current_price_precision = inferred_price_precision;
259                precision_updated = true;
260            }
261        }
262
263        if size_precision.is_none()
264            && let Some(bid_amount) = data.bids_0_amount
265        {
266            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
267            if inferred_size_precision > current_size_precision {
268                current_size_precision = inferred_size_precision;
269                precision_updated = true;
270            }
271        }
272
273        // If precision increased, update all previous depths
274        if precision_updated {
275            for depth in &mut depths {
276                for i in 0..DEPTH10_LEN {
277                    if price_precision.is_none() {
278                        depth.bids[i].price.precision = current_price_precision;
279                        depth.asks[i].price.precision = current_price_precision;
280                    }
281                    if size_precision.is_none() {
282                        depth.bids[i].size.precision = current_size_precision;
283                        depth.asks[i].size.precision = current_size_precision;
284                    }
285                }
286            }
287        }
288
289        let instrument_id = match &instrument_id {
290            Some(id) => *id,
291            None => parse_instrument_id(&data.exchange, data.symbol),
292        };
293        // Mark as both snapshot and last (consistent with streaming implementation)
294        let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
295        let sequence = 0; // Sequence not available
296        let ts_event = parse_timestamp(data.timestamp);
297        let ts_init = parse_timestamp(data.local_timestamp);
298
299        // Initialize empty arrays
300        let mut bids = [NULL_ORDER; DEPTH10_LEN];
301        let mut asks = [NULL_ORDER; DEPTH10_LEN];
302        let mut bid_counts = [0u32; DEPTH10_LEN];
303        let mut ask_counts = [0u32; DEPTH10_LEN];
304
305        for i in 0..=4 {
306            // Create bids
307            let (bid_order, bid_count) = create_book_order(
308                OrderSide::Buy,
309                match i {
310                    0 => data.bids_0_price,
311                    1 => data.bids_1_price,
312                    2 => data.bids_2_price,
313                    3 => data.bids_3_price,
314                    4 => data.bids_4_price,
315                    _ => unreachable!("i is constrained to 0..=4 by loop"),
316                },
317                match i {
318                    0 => data.bids_0_amount,
319                    1 => data.bids_1_amount,
320                    2 => data.bids_2_amount,
321                    3 => data.bids_3_amount,
322                    4 => data.bids_4_amount,
323                    _ => unreachable!("i is constrained to 0..=4 by loop"),
324                },
325                current_price_precision,
326                current_size_precision,
327            );
328            bids[i] = bid_order;
329            bid_counts[i] = bid_count;
330
331            // Create asks
332            let (ask_order, ask_count) = create_book_order(
333                OrderSide::Sell,
334                match i {
335                    0 => data.asks_0_price,
336                    1 => data.asks_1_price,
337                    2 => data.asks_2_price,
338                    3 => data.asks_3_price,
339                    4 => data.asks_4_price,
340                    _ => None, // Unreachable, but for safety
341                },
342                match i {
343                    0 => data.asks_0_amount,
344                    1 => data.asks_1_amount,
345                    2 => data.asks_2_amount,
346                    3 => data.asks_3_amount,
347                    4 => data.asks_4_amount,
348                    _ => None, // Unreachable, but for safety
349                },
350                current_price_precision,
351                current_size_precision,
352            );
353            asks[i] = ask_order;
354            ask_counts[i] = ask_count;
355        }
356
357        let depth = OrderBookDepth10::new(
358            instrument_id,
359            bids,
360            asks,
361            bid_counts,
362            ask_counts,
363            flags,
364            sequence,
365            ts_event,
366            ts_init,
367        );
368
369        depths.push(depth);
370
371        if let Some(limit) = limit
372            && depths.len() >= limit
373        {
374            break;
375        }
376    }
377
378    Ok(depths)
379}
380
381/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
382/// automatically applying `GZip` decompression for files ending in ".gz".
383/// Load order book depth-10 snapshots (25-level) from a CSV or gzipped CSV file.
384///
385/// # Errors
386///
387/// Returns an error if the file cannot be opened, read, or parsed as CSV.
388pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
389    filepath: P,
390    price_precision: Option<u8>,
391    size_precision: Option<u8>,
392    instrument_id: Option<InstrumentId>,
393    limit: Option<usize>,
394) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
395    // Estimate capacity for Vec pre-allocation
396    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
397    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
398
399    let mut current_price_precision = price_precision.unwrap_or(0);
400    let mut current_size_precision = size_precision.unwrap_or(0);
401    let mut reader = create_csv_reader(filepath)?;
402    let mut record = StringRecord::new();
403
404    while reader.read_record(&mut record)? {
405        let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
406
407        // Update precisions dynamically if not explicitly set
408        let mut precision_updated = false;
409
410        if price_precision.is_none()
411            && let Some(bid_price) = data.bids_0_price
412        {
413            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
414            if inferred_price_precision > current_price_precision {
415                current_price_precision = inferred_price_precision;
416                precision_updated = true;
417            }
418        }
419
420        if size_precision.is_none()
421            && let Some(bid_amount) = data.bids_0_amount
422        {
423            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
424            if inferred_size_precision > current_size_precision {
425                current_size_precision = inferred_size_precision;
426                precision_updated = true;
427            }
428        }
429
430        // If precision increased, update all previous depths
431        if precision_updated {
432            for depth in &mut depths {
433                for i in 0..DEPTH10_LEN {
434                    if price_precision.is_none() {
435                        depth.bids[i].price.precision = current_price_precision;
436                        depth.asks[i].price.precision = current_price_precision;
437                    }
438                    if size_precision.is_none() {
439                        depth.bids[i].size.precision = current_size_precision;
440                        depth.asks[i].size.precision = current_size_precision;
441                    }
442                }
443            }
444        }
445
446        let instrument_id = match &instrument_id {
447            Some(id) => *id,
448            None => parse_instrument_id(&data.exchange, data.symbol),
449        };
450        // Mark as both snapshot and last (consistent with streaming implementation)
451        let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
452        let sequence = 0; // Sequence not available
453        let ts_event = parse_timestamp(data.timestamp);
454        let ts_init = parse_timestamp(data.local_timestamp);
455
456        // Initialize empty arrays for the first 10 levels only
457        let mut bids = [NULL_ORDER; DEPTH10_LEN];
458        let mut asks = [NULL_ORDER; DEPTH10_LEN];
459        let mut bid_counts = [0u32; DEPTH10_LEN];
460        let mut ask_counts = [0u32; DEPTH10_LEN];
461
462        // Fill only the first 10 levels from the 25-level record
463        for i in 0..DEPTH10_LEN {
464            // Create bids
465            let (bid_order, bid_count) = create_book_order(
466                OrderSide::Buy,
467                match i {
468                    0 => data.bids_0_price,
469                    1 => data.bids_1_price,
470                    2 => data.bids_2_price,
471                    3 => data.bids_3_price,
472                    4 => data.bids_4_price,
473                    5 => data.bids_5_price,
474                    6 => data.bids_6_price,
475                    7 => data.bids_7_price,
476                    8 => data.bids_8_price,
477                    9 => data.bids_9_price,
478                    _ => unreachable!("i is constrained to 0..10 by loop"),
479                },
480                match i {
481                    0 => data.bids_0_amount,
482                    1 => data.bids_1_amount,
483                    2 => data.bids_2_amount,
484                    3 => data.bids_3_amount,
485                    4 => data.bids_4_amount,
486                    5 => data.bids_5_amount,
487                    6 => data.bids_6_amount,
488                    7 => data.bids_7_amount,
489                    8 => data.bids_8_amount,
490                    9 => data.bids_9_amount,
491                    _ => unreachable!("i is constrained to 0..10 by loop"),
492                },
493                current_price_precision,
494                current_size_precision,
495            );
496            bids[i] = bid_order;
497            bid_counts[i] = bid_count;
498
499            // Create asks
500            let (ask_order, ask_count) = create_book_order(
501                OrderSide::Sell,
502                match i {
503                    0 => data.asks_0_price,
504                    1 => data.asks_1_price,
505                    2 => data.asks_2_price,
506                    3 => data.asks_3_price,
507                    4 => data.asks_4_price,
508                    5 => data.asks_5_price,
509                    6 => data.asks_6_price,
510                    7 => data.asks_7_price,
511                    8 => data.asks_8_price,
512                    9 => data.asks_9_price,
513                    _ => unreachable!("i is constrained to 0..10 by loop"),
514                },
515                match i {
516                    0 => data.asks_0_amount,
517                    1 => data.asks_1_amount,
518                    2 => data.asks_2_amount,
519                    3 => data.asks_3_amount,
520                    4 => data.asks_4_amount,
521                    5 => data.asks_5_amount,
522                    6 => data.asks_6_amount,
523                    7 => data.asks_7_amount,
524                    8 => data.asks_8_amount,
525                    9 => data.asks_9_amount,
526                    _ => unreachable!("i is constrained to 0..10 by loop"),
527                },
528                current_price_precision,
529                current_size_precision,
530            );
531            asks[i] = ask_order;
532            ask_counts[i] = ask_count;
533        }
534
535        let depth = OrderBookDepth10::new(
536            instrument_id,
537            bids,
538            asks,
539            bid_counts,
540            ask_counts,
541            flags,
542            sequence,
543            ts_event,
544            ts_init,
545        );
546
547        depths.push(depth);
548
549        if let Some(limit) = limit
550            && depths.len() >= limit
551        {
552            break;
553        }
554    }
555
556    Ok(depths)
557}
558
559/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
560/// automatically applying `GZip` decompression for files ending in ".gz".
561/// Load quote ticks from a CSV or gzipped CSV file.
562///
563/// # Errors
564///
565/// Returns an error if the file cannot be opened, read, or parsed as CSV.
566///
567/// # Panics
568///
569/// Panics if a record has invalid data or CSV parsing errors.
570pub fn load_quotes<P: AsRef<Path>>(
571    filepath: P,
572    price_precision: Option<u8>,
573    size_precision: Option<u8>,
574    instrument_id: Option<InstrumentId>,
575    limit: Option<usize>,
576) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
577    // Estimate capacity for Vec pre-allocation
578    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
579    let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
580
581    let mut current_price_precision = price_precision.unwrap_or(0);
582    let mut current_size_precision = size_precision.unwrap_or(0);
583    let mut reader = create_csv_reader(filepath)?;
584    let mut record = StringRecord::new();
585
586    while reader.read_record(&mut record)? {
587        let data: TardisQuoteRecord = record.deserialize(None)?;
588
589        if price_precision.is_none()
590            && let Some(bid_price) = data.bid_price
591        {
592            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
593            if inferred_price_precision > current_price_precision {
594                current_price_precision = inferred_price_precision;
595            }
596        }
597
598        if size_precision.is_none()
599            && let Some(bid_amount) = data.bid_amount
600        {
601            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
602            if inferred_size_precision > current_size_precision {
603                current_size_precision = inferred_size_precision;
604            }
605        }
606
607        let quote = parse_quote_record(
608            &data,
609            current_price_precision,
610            current_size_precision,
611            instrument_id,
612        );
613
614        quotes.push(quote);
615
616        if let Some(limit) = limit
617            && quotes.len() >= limit
618        {
619            break;
620        }
621    }
622
623    // Update all quotes to use the final (maximum) precision discovered
624    // This is done once at the end instead of on every precision change (O(n) vs O(n²))
625    update_quotes_precision(
626        &mut quotes,
627        price_precision,
628        size_precision,
629        current_price_precision,
630        current_size_precision,
631    );
632
633    Ok(quotes)
634}
635
636/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
637/// automatically applying `GZip` decompression for files ending in ".gz".
638/// Load trade ticks from a CSV or gzipped CSV file.
639///
640/// # Errors
641///
642/// Returns an error if the file cannot be opened, read, or parsed as CSV.
643///
644/// # Panics
645///
646/// Panics if a record has invalid trade size or CSV parsing errors.
647pub fn load_trades<P: AsRef<Path>>(
648    filepath: P,
649    price_precision: Option<u8>,
650    size_precision: Option<u8>,
651    instrument_id: Option<InstrumentId>,
652    limit: Option<usize>,
653) -> Result<Vec<TradeTick>, Box<dyn Error>> {
654    // Estimate capacity for Vec pre-allocation
655    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
656    let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
657
658    let mut current_price_precision = price_precision.unwrap_or(0);
659    let mut current_size_precision = size_precision.unwrap_or(0);
660    let mut reader = create_csv_reader(filepath)?;
661    let mut record = StringRecord::new();
662
663    while reader.read_record(&mut record)? {
664        let data: TardisTradeRecord = record.deserialize(None)?;
665
666        if price_precision.is_none() {
667            let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
668            if inferred_price_precision > current_price_precision {
669                current_price_precision = inferred_price_precision;
670            }
671        }
672
673        if size_precision.is_none() {
674            let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
675            if inferred_size_precision > current_size_precision {
676                current_size_precision = inferred_size_precision;
677            }
678        }
679
680        let size = Quantity::new_checked(data.amount, current_size_precision)?;
681
682        if size.is_positive() {
683            let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
684
685            trades.push(trade);
686
687            if let Some(limit) = limit
688                && trades.len() >= limit
689            {
690                break;
691            }
692        } else {
693            log::warn!("Skipping zero-sized trade: {data:?}");
694        }
695    }
696
697    // Update all trades to use the final (maximum) precision discovered
698    // This is done once at the end instead of on every precision change (O(n) vs O(n²))
699    update_trades_precision(
700        &mut trades,
701        price_precision,
702        size_precision,
703        current_price_precision,
704        current_size_precision,
705    );
706
707    Ok(trades)
708}
709
710/// Loads [`FundingRateUpdate`]s from a Tardis format derivative ticker CSV at the given `filepath`,
711/// automatically applying `GZip` decompression for files ending in ".gz".
712///
713/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
714/// fields from derivative ticker data to create funding rate updates.
715///
716/// # Errors
717///
718/// Returns an error if the file cannot be opened, read, or parsed as CSV.
719pub fn load_funding_rates<P: AsRef<Path>>(
720    filepath: P,
721    instrument_id: Option<InstrumentId>,
722    limit: Option<usize>,
723) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
724    // Estimate capacity for Vec pre-allocation
725    let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
726    let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
727
728    let mut reader = create_csv_reader(filepath)?;
729    let mut record = StringRecord::new();
730
731    while reader.read_record(&mut record)? {
732        let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
733
734        // Parse to funding rate update (returns None if no funding data)
735        if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
736            funding_rates.push(funding_rate);
737
738            if let Some(limit) = limit
739                && funding_rates.len() >= limit
740            {
741                break;
742            }
743        }
744    }
745
746    Ok(funding_rates)
747}
748
749#[cfg(test)]
750mod tests {
751    use nautilus_model::{
752        enums::{AggressorSide, BookAction},
753        identifiers::TradeId,
754        types::Price,
755    };
756    use nautilus_testkit::common::{
757        get_tardis_binance_snapshot5_path, get_tardis_binance_snapshot25_path,
758        get_tardis_bitmex_trades_path, get_tardis_deribit_book_l2_path,
759        get_tardis_huobi_quotes_path,
760    };
761    use rstest::*;
762
763    use super::*;
764    use crate::{parse::parse_price, tests::get_test_data_path};
765
766    #[rstest]
767    #[case(0.0, 0)]
768    #[case(42.0, 0)]
769    #[case(0.1, 1)]
770    #[case(0.25, 2)]
771    #[case(123.0001, 4)]
772    #[case(-42.987654321,       9)]
773    #[case(1.234_567_890_123, 12)]
774    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
775        assert_eq!(infer_precision(input), expected);
776    }
777
778    #[rstest]
779    pub fn test_dynamic_precision_inference() {
780        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
781binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
782binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
783binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
784binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
785binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
786
787        let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
788        std::fs::write(&temp_file, csv_data).unwrap();
789
790        let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
791
792        // 5 data rows + 1 CLEAR delta at start (first row is snapshot)
793        assert_eq!(deltas.len(), 6);
794
795        // Skip the CLEAR delta at index 0
796        for (i, delta) in deltas.iter().skip(1).enumerate() {
797            assert_eq!(
798                delta.order.price.precision, 4,
799                "Price precision should be 4 for delta {i}",
800            );
801            assert_eq!(
802                delta.order.size.precision, 1,
803                "Size precision should be 1 for delta {i}",
804            );
805        }
806
807        // Test exact values to ensure retroactive precision updates work correctly
808        // Index 0 is CLEAR, data starts at index 1
809        assert_eq!(deltas[0].action, BookAction::Clear);
810
811        assert_eq!(deltas[1].order.price, parse_price(50000.0, 4));
812        assert_eq!(deltas[1].order.size, Quantity::new(1.0, 1));
813
814        assert_eq!(deltas[2].order.price, parse_price(49999.5, 4));
815        assert_eq!(deltas[2].order.size, Quantity::new(2.0, 1));
816
817        assert_eq!(deltas[3].order.price, parse_price(50000.12, 4));
818        assert_eq!(deltas[3].order.size, Quantity::new(1.5, 1));
819
820        assert_eq!(deltas[4].order.price, parse_price(49999.123, 4));
821        assert_eq!(deltas[4].order.size, Quantity::new(3.0, 1));
822
823        assert_eq!(deltas[5].order.price, parse_price(50000.1234, 4));
824        assert_eq!(deltas[5].order.size, Quantity::new(0.5, 1));
825
826        assert_eq!(
827            deltas[1].order.price.precision,
828            deltas[5].order.price.precision
829        );
830        assert_eq!(
831            deltas[1].order.size.precision,
832            deltas[3].order.size.precision
833        );
834
835        std::fs::remove_file(&temp_file).ok();
836    }
837
838    #[rstest]
839    #[case(Some(1), Some(0))] // Explicit precisions
840    #[case(None, None)] // Inferred precisions
841    pub fn test_read_deltas(
842        #[case] price_precision: Option<u8>,
843        #[case] size_precision: Option<u8>,
844    ) {
845        let filepath = get_tardis_deribit_book_l2_path();
846        let deltas =
847            load_deltas(filepath, price_precision, size_precision, None, Some(100)).unwrap();
848
849        // 15 data rows + 1 CLEAR delta at start (first row is snapshot)
850        assert_eq!(deltas.len(), 16);
851
852        // Index 0 is CLEAR delta
853        assert_eq!(deltas[0].action, BookAction::Clear);
854
855        // Index 1 is first data delta
856        assert_eq!(
857            deltas[1].instrument_id,
858            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
859        );
860        assert_eq!(deltas[1].action, BookAction::Add);
861        assert_eq!(deltas[1].order.side, OrderSide::Sell);
862        assert_eq!(deltas[1].order.price, Price::from("6421.5"));
863        assert_eq!(deltas[1].order.size, Quantity::from("18640"));
864        assert_eq!(deltas[1].flags, 0);
865        assert_eq!(deltas[1].sequence, 0);
866        assert_eq!(deltas[1].ts_event, 1585699200245000000);
867        assert_eq!(deltas[1].ts_init, 1585699200355684000);
868    }
869
870    #[rstest]
871    #[case(Some(2), Some(3))] // Explicit precisions
872    #[case(None, None)] // Inferred precisions
873    pub fn test_read_depth10s_from_snapshot5(
874        #[case] price_precision: Option<u8>,
875        #[case] size_precision: Option<u8>,
876    ) {
877        let filepath = get_tardis_binance_snapshot5_path();
878        let depths =
879            load_depth10_from_snapshot5(filepath, price_precision, size_precision, None, Some(100))
880                .unwrap();
881
882        assert_eq!(depths.len(), 10);
883        assert_eq!(
884            depths[0].instrument_id,
885            InstrumentId::from("BTCUSDT.BINANCE")
886        );
887        assert_eq!(depths[0].bids.len(), 10);
888        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
889        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
890        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
891        assert_eq!(depths[0].bids[0].order_id, 0);
892        assert_eq!(depths[0].asks.len(), 10);
893        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
894        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
895        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
896        assert_eq!(depths[0].asks[0].order_id, 0);
897        assert_eq!(depths[0].bid_counts[0], 1);
898        assert_eq!(depths[0].ask_counts[0], 1);
899        // F_SNAPSHOT (32) | F_LAST (128) = 160
900        assert_eq!(
901            depths[0].flags,
902            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
903        );
904        assert_eq!(depths[0].ts_event, 1598918403696000000);
905        assert_eq!(depths[0].ts_init, 1598918403810979000);
906        assert_eq!(depths[0].sequence, 0);
907    }
908
909    #[rstest]
910    #[case(Some(2), Some(3))] // Explicit precisions
911    #[case(None, None)] // Inferred precisions
912    pub fn test_read_depth10s_from_snapshot25(
913        #[case] price_precision: Option<u8>,
914        #[case] size_precision: Option<u8>,
915    ) {
916        let filepath = get_tardis_binance_snapshot25_path();
917        let depths = load_depth10_from_snapshot25(
918            filepath,
919            price_precision,
920            size_precision,
921            None,
922            Some(100),
923        )
924        .unwrap();
925
926        assert_eq!(depths.len(), 10);
927        assert_eq!(
928            depths[0].instrument_id,
929            InstrumentId::from("BTCUSDT.BINANCE")
930        );
931        assert_eq!(depths[0].bids.len(), 10);
932        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
933        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
934        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
935        assert_eq!(depths[0].bids[0].order_id, 0);
936        assert_eq!(depths[0].asks.len(), 10);
937        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
938        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
939        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
940        assert_eq!(depths[0].asks[0].order_id, 0);
941        assert_eq!(depths[0].bid_counts[0], 1);
942        assert_eq!(depths[0].ask_counts[0], 1);
943        // F_SNAPSHOT (32) | F_LAST (128) = 160
944        assert_eq!(
945            depths[0].flags,
946            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
947        );
948        assert_eq!(depths[0].ts_event, 1598918403696000000);
949        assert_eq!(depths[0].ts_init, 1598918403810979000);
950        assert_eq!(depths[0].sequence, 0);
951    }
952
953    #[rstest]
954    #[case(Some(1), Some(0))] // Explicit precisions
955    #[case(None, None)] // Inferred precisions
956    pub fn test_read_quotes(
957        #[case] price_precision: Option<u8>,
958        #[case] size_precision: Option<u8>,
959    ) {
960        let filepath = get_tardis_huobi_quotes_path();
961        let quotes =
962            load_quotes(filepath, price_precision, size_precision, None, Some(100)).unwrap();
963
964        assert_eq!(quotes.len(), 10);
965        assert_eq!(
966            quotes[0].instrument_id,
967            InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
968        );
969        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
970        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
971        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
972        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
973        assert_eq!(quotes[0].ts_event, 1588291201099000000);
974        assert_eq!(quotes[0].ts_init, 1588291201234268000);
975    }
976
977    #[rstest]
978    #[case(Some(1), Some(0))] // Explicit precisions
979    #[case(None, None)] // Inferred precisions
980    pub fn test_read_trades(
981        #[case] price_precision: Option<u8>,
982        #[case] size_precision: Option<u8>,
983    ) {
984        let filepath = get_tardis_bitmex_trades_path();
985        let trades =
986            load_trades(filepath, price_precision, size_precision, None, Some(100)).unwrap();
987
988        assert_eq!(trades.len(), 10);
989        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
990        assert_eq!(trades[0].price, Price::from("8531.5"));
991        assert_eq!(trades[0].size, Quantity::from("2152"));
992        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
993        assert_eq!(
994            trades[0].trade_id,
995            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
996        );
997        assert_eq!(trades[0].ts_event, 1583020803145000000);
998        assert_eq!(trades[0].ts_init, 1583020803307160000);
999    }
1000
1001    #[rstest]
1002    pub fn test_load_trades_with_zero_sized_trade() {
1003        // Create test CSV data with one zero-sized trade that should be skipped
1004        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1005binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1006binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1007binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1008binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1009
1010        let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
1011        std::fs::write(&temp_file, csv_data).unwrap();
1012
1013        let trades = load_trades(
1014            &temp_file,
1015            Some(4),
1016            Some(1),
1017            None,
1018            None, // No limit, load all
1019        )
1020        .unwrap();
1021
1022        // Should have 3 trades (zero-sized trade skipped)
1023        assert_eq!(trades.len(), 3);
1024
1025        // Verify the correct trades were loaded (not the zero-sized one)
1026        assert_eq!(trades[0].size, Quantity::from("1.0"));
1027        assert_eq!(trades[1].size, Quantity::from("1.5"));
1028        assert_eq!(trades[2].size, Quantity::from("3.0"));
1029
1030        // Verify trade IDs to confirm correct trades were loaded
1031        assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1032        assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1033        assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1034
1035        std::fs::remove_file(&temp_file).ok();
1036    }
1037
1038    #[rstest]
1039    pub fn test_load_trades_from_local_file() {
1040        let filepath = get_test_data_path("csv/trades_1.csv");
1041        let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1042        assert_eq!(trades.len(), 2);
1043        assert_eq!(trades[0].price, Price::from("8531.5"));
1044        assert_eq!(trades[1].size, Quantity::from("1000"));
1045    }
1046
1047    #[rstest]
1048    pub fn test_load_deltas_from_local_file() {
1049        let filepath = get_test_data_path("csv/deltas_1.csv");
1050        let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1051
1052        // 2 data rows + 1 CLEAR delta at start (first row is snapshot)
1053        assert_eq!(deltas.len(), 3);
1054        assert_eq!(deltas[0].action, BookAction::Clear);
1055        assert_eq!(deltas[1].order.price, Price::from("6421.5"));
1056        assert_eq!(deltas[2].order.size, Quantity::from("10000"));
1057    }
1058
1059    #[rstest]
1060    fn test_load_depth10_from_snapshot5_comprehensive() {
1061        let filepath = get_tardis_binance_snapshot5_path();
1062        let depths = load_depth10_from_snapshot5(&filepath, None, None, None, Some(100)).unwrap();
1063
1064        assert_eq!(depths.len(), 10);
1065
1066        let first = &depths[0];
1067        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1068        assert_eq!(first.bids.len(), 10);
1069        assert_eq!(first.asks.len(), 10);
1070
1071        // Check all bid levels (first 5 from data, rest empty)
1072        assert_eq!(first.bids[0].price, Price::from("11657.07"));
1073        assert_eq!(first.bids[0].size, Quantity::from("10.896"));
1074        assert_eq!(first.bids[0].side, OrderSide::Buy);
1075
1076        assert_eq!(first.bids[1].price, Price::from("11656.97"));
1077        assert_eq!(first.bids[1].size, Quantity::from("0.2"));
1078        assert_eq!(first.bids[1].side, OrderSide::Buy);
1079
1080        assert_eq!(first.bids[2].price, Price::from("11655.78"));
1081        assert_eq!(first.bids[2].size, Quantity::from("0.2"));
1082        assert_eq!(first.bids[2].side, OrderSide::Buy);
1083
1084        assert_eq!(first.bids[3].price, Price::from("11655.77"));
1085        assert_eq!(first.bids[3].size, Quantity::from("0.98"));
1086        assert_eq!(first.bids[3].side, OrderSide::Buy);
1087
1088        assert_eq!(first.bids[4].price, Price::from("11655.68"));
1089        assert_eq!(first.bids[4].size, Quantity::from("0.111"));
1090        assert_eq!(first.bids[4].side, OrderSide::Buy);
1091
1092        // Empty levels
1093        for i in 5..10 {
1094            assert_eq!(first.bids[i].price.raw, 0);
1095            assert_eq!(first.bids[i].size.raw, 0);
1096            assert_eq!(first.bids[i].side, OrderSide::NoOrderSide);
1097        }
1098
1099        // Check all ask levels (first 5 from data, rest empty)
1100        assert_eq!(first.asks[0].price, Price::from("11657.08"));
1101        assert_eq!(first.asks[0].size, Quantity::from("1.714"));
1102        assert_eq!(first.asks[0].side, OrderSide::Sell);
1103
1104        assert_eq!(first.asks[1].price, Price::from("11657.54"));
1105        assert_eq!(first.asks[1].size, Quantity::from("5.4"));
1106        assert_eq!(first.asks[1].side, OrderSide::Sell);
1107
1108        assert_eq!(first.asks[2].price, Price::from("11657.56"));
1109        assert_eq!(first.asks[2].size, Quantity::from("0.238"));
1110        assert_eq!(first.asks[2].side, OrderSide::Sell);
1111
1112        assert_eq!(first.asks[3].price, Price::from("11657.61"));
1113        assert_eq!(first.asks[3].size, Quantity::from("0.077"));
1114        assert_eq!(first.asks[3].side, OrderSide::Sell);
1115
1116        assert_eq!(first.asks[4].price, Price::from("11657.92"));
1117        assert_eq!(first.asks[4].size, Quantity::from("0.918"));
1118        assert_eq!(first.asks[4].side, OrderSide::Sell);
1119
1120        // Empty levels
1121        for i in 5..10 {
1122            assert_eq!(first.asks[i].price.raw, 0);
1123            assert_eq!(first.asks[i].size.raw, 0);
1124            assert_eq!(first.asks[i].side, OrderSide::NoOrderSide);
1125        }
1126
1127        // Logical checks: bid prices should decrease
1128        for i in 1..5 {
1129            assert!(
1130                first.bids[i].price < first.bids[i - 1].price,
1131                "Bid price at level {} should be less than level {}",
1132                i,
1133                i - 1
1134            );
1135        }
1136
1137        // Logical checks: ask prices should increase
1138        for i in 1..5 {
1139            assert!(
1140                first.asks[i].price > first.asks[i - 1].price,
1141                "Ask price at level {} should be greater than level {}",
1142                i,
1143                i - 1
1144            );
1145        }
1146
1147        // Logical check: spread should be positive
1148        assert!(
1149            first.asks[0].price > first.bids[0].price,
1150            "Best ask should be greater than best bid"
1151        );
1152
1153        // Check counts
1154        for i in 0..5 {
1155            assert_eq!(first.bid_counts[i], 1);
1156            assert_eq!(first.ask_counts[i], 1);
1157        }
1158        for i in 5..10 {
1159            assert_eq!(first.bid_counts[i], 0);
1160            assert_eq!(first.ask_counts[i], 0);
1161        }
1162
1163        // Check metadata - F_SNAPSHOT (32) | F_LAST (128) = 160
1164        assert_eq!(
1165            first.flags,
1166            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1167        );
1168        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1169        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1170        assert_eq!(first.sequence, 0);
1171    }
1172
1173    #[rstest]
1174    fn test_load_depth10_from_snapshot25_comprehensive() {
1175        let filepath = get_tardis_binance_snapshot25_path();
1176        let depths = load_depth10_from_snapshot25(&filepath, None, None, None, Some(100)).unwrap();
1177
1178        assert_eq!(depths.len(), 10);
1179
1180        let first = &depths[0];
1181        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1182        assert_eq!(first.bids.len(), 10);
1183        assert_eq!(first.asks.len(), 10);
1184
1185        // Check all 10 bid levels from snapshot25
1186        let expected_bids = vec![
1187            ("11657.07", "10.896"),
1188            ("11656.97", "0.2"),
1189            ("11655.78", "0.2"),
1190            ("11655.77", "0.98"),
1191            ("11655.68", "0.111"),
1192            ("11655.66", "0.077"),
1193            ("11655.57", "0.34"),
1194            ("11655.48", "0.4"),
1195            ("11655.26", "1.185"),
1196            ("11654.86", "0.195"),
1197        ];
1198
1199        for (i, (price, size)) in expected_bids.iter().enumerate() {
1200            assert_eq!(first.bids[i].price, Price::from(*price));
1201            assert_eq!(first.bids[i].size, Quantity::from(*size));
1202            assert_eq!(first.bids[i].side, OrderSide::Buy);
1203        }
1204
1205        // Check all 10 ask levels from snapshot25
1206        let expected_asks = vec![
1207            ("11657.08", "1.714"),
1208            ("11657.54", "5.4"),
1209            ("11657.56", "0.238"),
1210            ("11657.61", "0.077"),
1211            ("11657.92", "0.918"),
1212            ("11658.09", "1.015"),
1213            ("11658.12", "0.665"),
1214            ("11658.19", "0.583"),
1215            ("11658.28", "0.255"),
1216            ("11658.29", "0.656"),
1217        ];
1218
1219        for (i, (price, size)) in expected_asks.iter().enumerate() {
1220            assert_eq!(first.asks[i].price, Price::from(*price));
1221            assert_eq!(first.asks[i].size, Quantity::from(*size));
1222            assert_eq!(first.asks[i].side, OrderSide::Sell);
1223        }
1224
1225        // Logical checks: bid prices should strictly decrease
1226        for i in 1..10 {
1227            assert!(
1228                first.bids[i].price < first.bids[i - 1].price,
1229                "Bid price at level {} ({}) should be less than level {} ({})",
1230                i,
1231                first.bids[i].price,
1232                i - 1,
1233                first.bids[i - 1].price
1234            );
1235        }
1236
1237        // Logical checks: ask prices should strictly increase
1238        for i in 1..10 {
1239            assert!(
1240                first.asks[i].price > first.asks[i - 1].price,
1241                "Ask price at level {} ({}) should be greater than level {} ({})",
1242                i,
1243                first.asks[i].price,
1244                i - 1,
1245                first.asks[i - 1].price
1246            );
1247        }
1248
1249        // Logical check: spread should be positive
1250        assert!(
1251            first.asks[0].price > first.bids[0].price,
1252            "Best ask ({}) should be greater than best bid ({})",
1253            first.asks[0].price,
1254            first.bids[0].price
1255        );
1256
1257        // Check counts (all should be 1 for snapshot data)
1258        for i in 0..10 {
1259            assert_eq!(first.bid_counts[i], 1);
1260            assert_eq!(first.ask_counts[i], 1);
1261        }
1262
1263        // Check metadata - F_SNAPSHOT (32) | F_LAST (128) = 160
1264        assert_eq!(
1265            first.flags,
1266            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1267        );
1268        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1269        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1270        assert_eq!(first.sequence, 0);
1271    }
1272
1273    #[rstest]
1274    fn test_snapshot_csv_field_order_interleaved() {
1275        // This test verifies that the CSV structs correctly handle the interleaved
1276        // asks/bids field ordering from Tardis CSV files
1277
1278        let csv_data = "exchange,symbol,timestamp,local_timestamp,\
1279asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,\
1280asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,\
1281asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,\
1282asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,\
1283asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1284binance-futures,BTCUSDT,1000000,2000000,\
1285100.5,1.0,100.4,2.0,\
1286100.6,1.1,100.3,2.1,\
1287100.7,1.2,100.2,2.2,\
1288100.8,1.3,100.1,2.3,\
1289100.9,1.4,100.0,2.4";
1290
1291        let temp_file = std::env::temp_dir().join("test_interleaved_snapshot5.csv");
1292        std::fs::write(&temp_file, csv_data).unwrap();
1293
1294        let depths = load_depth10_from_snapshot5(&temp_file, None, None, None, Some(1)).unwrap();
1295        assert_eq!(depths.len(), 1);
1296
1297        let depth = &depths[0];
1298
1299        // Verify bids are correctly parsed (should be decreasing)
1300        assert_eq!(depth.bids[0].price, Price::from("100.4"));
1301        assert_eq!(depth.bids[1].price, Price::from("100.3"));
1302        assert_eq!(depth.bids[2].price, Price::from("100.2"));
1303        assert_eq!(depth.bids[3].price, Price::from("100.1"));
1304        assert_eq!(depth.bids[4].price, Price::from("100.0"));
1305
1306        // Verify asks are correctly parsed (should be increasing)
1307        assert_eq!(depth.asks[0].price, Price::from("100.5"));
1308        assert_eq!(depth.asks[1].price, Price::from("100.6"));
1309        assert_eq!(depth.asks[2].price, Price::from("100.7"));
1310        assert_eq!(depth.asks[3].price, Price::from("100.8"));
1311        assert_eq!(depth.asks[4].price, Price::from("100.9"));
1312
1313        // Verify sizes
1314        assert_eq!(depth.bids[0].size, Quantity::from("2.0"));
1315        assert_eq!(depth.asks[0].size, Quantity::from("1.0"));
1316
1317        std::fs::remove_file(temp_file).unwrap();
1318    }
1319
1320    #[rstest]
1321    fn test_load_deltas_limit_includes_clear_deltas() {
1322        // Test that limit counts total emitted deltas (including CLEARs)
1323        // When limit=5, we should get exactly 5 deltas: 1 CLEAR + 4 data deltas
1324        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1325binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1326binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1327binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1328binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1329binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1330binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,2.0
1331binance-futures,BTCUSDT,1640995205000000,1640995205100000,false,bid,49997.0,0.5";
1332
1333        let temp_file = std::env::temp_dir().join("test_load_deltas_limit.csv");
1334        std::fs::write(&temp_file, csv_data).unwrap();
1335
1336        // Load with limit=5 (should emit exactly 5 deltas including CLEAR)
1337        let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(5)).unwrap();
1338
1339        // Should have exactly 5 deltas: 1 CLEAR + 4 data deltas
1340        assert_eq!(deltas.len(), 5);
1341        assert_eq!(deltas[0].action, BookAction::Clear);
1342        assert_eq!(deltas[1].action, BookAction::Add);
1343        assert_eq!(deltas[2].action, BookAction::Add);
1344        assert_eq!(deltas[3].action, BookAction::Update);
1345        assert_eq!(deltas[4].action, BookAction::Update);
1346
1347        // Verify the last delta is from the 4th CSV record (49999.0 bid)
1348        assert_eq!(deltas[3].order.price, parse_price(49999.0, 1));
1349
1350        std::fs::remove_file(&temp_file).ok();
1351    }
1352
1353    #[rstest]
1354    fn test_load_deltas_limit_stops_at_clear() {
1355        // Test that limit=1 with snapshot data returns only the CLEAR delta
1356        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1357binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1358binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
1359
1360        let temp_file = std::env::temp_dir().join("test_load_deltas_limit_stops_at_clear.csv");
1361        std::fs::write(&temp_file, csv_data).unwrap();
1362
1363        // Load with limit=1 should only get the CLEAR delta
1364        let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(1)).unwrap();
1365
1366        assert_eq!(deltas.len(), 1);
1367        assert_eq!(deltas[0].action, BookAction::Clear);
1368
1369        std::fs::remove_file(&temp_file).ok();
1370    }
1371
1372    #[rstest]
1373    fn test_load_deltas_limit_with_mid_day_snapshot() {
1374        // Test limit behavior when there's a mid-day snapshot
1375        // The limit counts total emitted deltas including CLEARs
1376        let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
1377        let deltas = load_deltas(filepath, Some(1), Some(1), None, Some(5)).unwrap();
1378
1379        // With limit=5, we get exactly 5 deltas
1380        // First snapshot inserts CLEAR, then we get 4 more data deltas
1381        assert_eq!(deltas.len(), 5);
1382        assert_eq!(deltas[0].action, BookAction::Clear);
1383    }
1384}