Skip to main content

nautilus_tardis/csv/
load.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{
22        DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick,
24    },
25    enums::{OrderSide, RecordFlag},
26    identifiers::InstrumentId,
27    types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31    csv::{
32        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
33        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
34        record::{
35            TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
36            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
37        },
38    },
39    parse::{parse_instrument_id, parse_timestamp},
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43    if explicit.is_some() {
44        return false;
45    }
46
47    let inferred = infer_precision(value).min(FIXED_PRECISION);
48    if inferred > *current {
49        *current = inferred;
50        true
51    } else {
52        false
53    }
54}
55
56fn update_deltas_precision(
57    deltas: &mut [OrderBookDelta],
58    price_precision: Option<u8>,
59    size_precision: Option<u8>,
60    current_price_precision: u8,
61    current_size_precision: u8,
62) {
63    for delta in deltas {
64        if price_precision.is_none() {
65            delta.order.price.precision = current_price_precision;
66        }
67        if size_precision.is_none() {
68            delta.order.size.precision = current_size_precision;
69        }
70    }
71}
72
73fn update_quotes_precision(
74    quotes: &mut [QuoteTick],
75    price_precision: Option<u8>,
76    size_precision: Option<u8>,
77    current_price_precision: u8,
78    current_size_precision: u8,
79) {
80    for quote in quotes {
81        if price_precision.is_none() {
82            quote.bid_price.precision = current_price_precision;
83            quote.ask_price.precision = current_price_precision;
84        }
85        if size_precision.is_none() {
86            quote.bid_size.precision = current_size_precision;
87            quote.ask_size.precision = current_size_precision;
88        }
89    }
90}
91
92fn update_trades_precision(
93    trades: &mut [TradeTick],
94    price_precision: Option<u8>,
95    size_precision: Option<u8>,
96    current_price_precision: u8,
97    current_size_precision: u8,
98) {
99    for trade in trades {
100        if price_precision.is_none() {
101            trade.price.precision = current_price_precision;
102        }
103        if size_precision.is_none() {
104            trade.size.precision = current_size_precision;
105        }
106    }
107}
108
109/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
110/// automatically applying `GZip` decompression for files ending in ".gz".
111/// Load order book delta records from a CSV or gzipped CSV file.
112///
113/// # Errors
114///
115/// Returns an error if the file cannot be opened, read, or parsed as CSV.
116///
117/// # Panics
118///
119/// Panics if a CSV record has a zero size for a non-delete action or if data conversion fails.
120pub fn load_deltas<P: AsRef<Path>>(
121    filepath: P,
122    price_precision: Option<u8>,
123    size_precision: Option<u8>,
124    instrument_id: Option<InstrumentId>,
125    limit: Option<usize>,
126) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
127    // Estimate capacity for Vec pre-allocation
128    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
129    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
130
131    let mut current_price_precision = price_precision.unwrap_or(0);
132    let mut current_size_precision = size_precision.unwrap_or(0);
133    let mut last_ts_event = UnixNanos::default();
134    let mut last_is_snapshot = false;
135
136    let mut reader = create_csv_reader(filepath)?;
137    let mut record = StringRecord::new();
138
139    while reader.read_record(&mut record)? {
140        if let Some(limit) = limit
141            && deltas.len() >= limit
142        {
143            break;
144        }
145
146        let data: TardisBookUpdateRecord = record.deserialize(None)?;
147
148        update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
149        update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
150
151        // Insert CLEAR on snapshot boundary to reset order book state
152        if data.is_snapshot && !last_is_snapshot {
153            let clear_instrument_id =
154                instrument_id.unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
155            let ts_event = parse_timestamp(data.timestamp);
156            let ts_init = parse_timestamp(data.local_timestamp);
157
158            if last_ts_event != ts_event
159                && let Some(last_delta) = deltas.last_mut()
160            {
161                last_delta.flags = RecordFlag::F_LAST.value();
162            }
163            last_ts_event = ts_event;
164
165            let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
166            deltas.push(clear_delta);
167
168            if let Some(limit) = limit
169                && deltas.len() >= limit
170            {
171                break;
172            }
173        }
174        last_is_snapshot = data.is_snapshot;
175
176        let delta = match parse_delta_record(
177            &data,
178            current_price_precision,
179            current_size_precision,
180            instrument_id,
181        ) {
182            Ok(d) => d,
183            Err(e) => {
184                log::warn!("Skipping invalid delta record: {e}");
185                continue;
186            }
187        };
188
189        let ts_event = delta.ts_event;
190        if last_ts_event != ts_event
191            && let Some(last_delta) = deltas.last_mut()
192        {
193            last_delta.flags = RecordFlag::F_LAST.value();
194        }
195
196        last_ts_event = ts_event;
197
198        deltas.push(delta);
199    }
200
201    // Set F_LAST flag for final delta
202    if let Some(last_delta) = deltas.last_mut() {
203        last_delta.flags = RecordFlag::F_LAST.value();
204    }
205
206    // Update all deltas to use the final (maximum) precision discovered
207    // This is done once at the end instead of on every precision change (O(n) vs O(n²))
208    update_deltas_precision(
209        &mut deltas,
210        price_precision,
211        size_precision,
212        current_price_precision,
213        current_size_precision,
214    );
215
216    Ok(deltas)
217}
218
219/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
220/// automatically applying `GZip` decompression for files ending in ".gz".
221/// Load order book depth-10 snapshots (5-level) from a CSV or gzipped CSV file.
222///
223/// # Errors
224///
225/// Returns an error if the file cannot be opened, read, or parsed as CSV.
226///
227/// # Panics
228///
229/// Panics if a record level cannot be parsed to depth-10.
230pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
231    filepath: P,
232    price_precision: Option<u8>,
233    size_precision: Option<u8>,
234    instrument_id: Option<InstrumentId>,
235    limit: Option<usize>,
236) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
237    // Estimate capacity for Vec pre-allocation
238    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
239    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
240
241    let mut current_price_precision = price_precision.unwrap_or(0);
242    let mut current_size_precision = size_precision.unwrap_or(0);
243
244    let mut reader = create_csv_reader(filepath)?;
245    let mut record = StringRecord::new();
246
247    while reader.read_record(&mut record)? {
248        let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
249
250        // Update precisions dynamically if not explicitly set
251        let mut precision_updated = false;
252
253        if price_precision.is_none()
254            && let Some(bid_price) = data.bids_0_price
255        {
256            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
257            if inferred_price_precision > current_price_precision {
258                current_price_precision = inferred_price_precision;
259                precision_updated = true;
260            }
261        }
262
263        if size_precision.is_none()
264            && let Some(bid_amount) = data.bids_0_amount
265        {
266            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
267            if inferred_size_precision > current_size_precision {
268                current_size_precision = inferred_size_precision;
269                precision_updated = true;
270            }
271        }
272
273        // If precision increased, update all previous depths
274        if precision_updated {
275            for depth in &mut depths {
276                for i in 0..DEPTH10_LEN {
277                    if price_precision.is_none() {
278                        depth.bids[i].price.precision = current_price_precision;
279                        depth.asks[i].price.precision = current_price_precision;
280                    }
281                    if size_precision.is_none() {
282                        depth.bids[i].size.precision = current_size_precision;
283                        depth.asks[i].size.precision = current_size_precision;
284                    }
285                }
286            }
287        }
288
289        let instrument_id = match &instrument_id {
290            Some(id) => *id,
291            None => parse_instrument_id(&data.exchange, data.symbol),
292        };
293        // Mark as both snapshot and last (consistent with streaming implementation)
294        let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
295        let sequence = 0; // Sequence not available
296        let ts_event = parse_timestamp(data.timestamp);
297        let ts_init = parse_timestamp(data.local_timestamp);
298
299        // Initialize empty arrays
300        let mut bids = [NULL_ORDER; DEPTH10_LEN];
301        let mut asks = [NULL_ORDER; DEPTH10_LEN];
302        let mut bid_counts = [0u32; DEPTH10_LEN];
303        let mut ask_counts = [0u32; DEPTH10_LEN];
304
305        for i in 0..=4 {
306            // Create bids
307            let (bid_order, bid_count) = create_book_order(
308                OrderSide::Buy,
309                match i {
310                    0 => data.bids_0_price,
311                    1 => data.bids_1_price,
312                    2 => data.bids_2_price,
313                    3 => data.bids_3_price,
314                    4 => data.bids_4_price,
315                    _ => unreachable!("i is constrained to 0..=4 by loop"),
316                },
317                match i {
318                    0 => data.bids_0_amount,
319                    1 => data.bids_1_amount,
320                    2 => data.bids_2_amount,
321                    3 => data.bids_3_amount,
322                    4 => data.bids_4_amount,
323                    _ => unreachable!("i is constrained to 0..=4 by loop"),
324                },
325                current_price_precision,
326                current_size_precision,
327            );
328            bids[i] = bid_order;
329            bid_counts[i] = bid_count;
330
331            // Create asks
332            let (ask_order, ask_count) = create_book_order(
333                OrderSide::Sell,
334                match i {
335                    0 => data.asks_0_price,
336                    1 => data.asks_1_price,
337                    2 => data.asks_2_price,
338                    3 => data.asks_3_price,
339                    4 => data.asks_4_price,
340                    _ => None, // Unreachable, but for safety
341                },
342                match i {
343                    0 => data.asks_0_amount,
344                    1 => data.asks_1_amount,
345                    2 => data.asks_2_amount,
346                    3 => data.asks_3_amount,
347                    4 => data.asks_4_amount,
348                    _ => None, // Unreachable, but for safety
349                },
350                current_price_precision,
351                current_size_precision,
352            );
353            asks[i] = ask_order;
354            ask_counts[i] = ask_count;
355        }
356
357        let depth = OrderBookDepth10::new(
358            instrument_id,
359            bids,
360            asks,
361            bid_counts,
362            ask_counts,
363            flags,
364            sequence,
365            ts_event,
366            ts_init,
367        );
368
369        depths.push(depth);
370
371        if let Some(limit) = limit
372            && depths.len() >= limit
373        {
374            break;
375        }
376    }
377
378    Ok(depths)
379}
380
381/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
382/// automatically applying `GZip` decompression for files ending in ".gz".
383/// Load order book depth-10 snapshots (25-level) from a CSV or gzipped CSV file.
384///
385/// # Errors
386///
387/// Returns an error if the file cannot be opened, read, or parsed as CSV.
388pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
389    filepath: P,
390    price_precision: Option<u8>,
391    size_precision: Option<u8>,
392    instrument_id: Option<InstrumentId>,
393    limit: Option<usize>,
394) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
395    // Estimate capacity for Vec pre-allocation
396    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
397    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
398
399    let mut current_price_precision = price_precision.unwrap_or(0);
400    let mut current_size_precision = size_precision.unwrap_or(0);
401    let mut reader = create_csv_reader(filepath)?;
402    let mut record = StringRecord::new();
403
404    while reader.read_record(&mut record)? {
405        let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
406
407        // Update precisions dynamically if not explicitly set
408        let mut precision_updated = false;
409
410        if price_precision.is_none()
411            && let Some(bid_price) = data.bids_0_price
412        {
413            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
414            if inferred_price_precision > current_price_precision {
415                current_price_precision = inferred_price_precision;
416                precision_updated = true;
417            }
418        }
419
420        if size_precision.is_none()
421            && let Some(bid_amount) = data.bids_0_amount
422        {
423            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
424            if inferred_size_precision > current_size_precision {
425                current_size_precision = inferred_size_precision;
426                precision_updated = true;
427            }
428        }
429
430        // If precision increased, update all previous depths
431        if precision_updated {
432            for depth in &mut depths {
433                for i in 0..DEPTH10_LEN {
434                    if price_precision.is_none() {
435                        depth.bids[i].price.precision = current_price_precision;
436                        depth.asks[i].price.precision = current_price_precision;
437                    }
438                    if size_precision.is_none() {
439                        depth.bids[i].size.precision = current_size_precision;
440                        depth.asks[i].size.precision = current_size_precision;
441                    }
442                }
443            }
444        }
445
446        let instrument_id = match &instrument_id {
447            Some(id) => *id,
448            None => parse_instrument_id(&data.exchange, data.symbol),
449        };
450        // Mark as both snapshot and last (consistent with streaming implementation)
451        let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
452        let sequence = 0; // Sequence not available
453        let ts_event = parse_timestamp(data.timestamp);
454        let ts_init = parse_timestamp(data.local_timestamp);
455
456        // Initialize empty arrays for the first 10 levels only
457        let mut bids = [NULL_ORDER; DEPTH10_LEN];
458        let mut asks = [NULL_ORDER; DEPTH10_LEN];
459        let mut bid_counts = [0u32; DEPTH10_LEN];
460        let mut ask_counts = [0u32; DEPTH10_LEN];
461
462        // Fill only the first 10 levels from the 25-level record
463        for i in 0..DEPTH10_LEN {
464            // Create bids
465            let (bid_order, bid_count) = create_book_order(
466                OrderSide::Buy,
467                match i {
468                    0 => data.bids_0_price,
469                    1 => data.bids_1_price,
470                    2 => data.bids_2_price,
471                    3 => data.bids_3_price,
472                    4 => data.bids_4_price,
473                    5 => data.bids_5_price,
474                    6 => data.bids_6_price,
475                    7 => data.bids_7_price,
476                    8 => data.bids_8_price,
477                    9 => data.bids_9_price,
478                    _ => unreachable!("i is constrained to 0..10 by loop"),
479                },
480                match i {
481                    0 => data.bids_0_amount,
482                    1 => data.bids_1_amount,
483                    2 => data.bids_2_amount,
484                    3 => data.bids_3_amount,
485                    4 => data.bids_4_amount,
486                    5 => data.bids_5_amount,
487                    6 => data.bids_6_amount,
488                    7 => data.bids_7_amount,
489                    8 => data.bids_8_amount,
490                    9 => data.bids_9_amount,
491                    _ => unreachable!("i is constrained to 0..10 by loop"),
492                },
493                current_price_precision,
494                current_size_precision,
495            );
496            bids[i] = bid_order;
497            bid_counts[i] = bid_count;
498
499            // Create asks
500            let (ask_order, ask_count) = create_book_order(
501                OrderSide::Sell,
502                match i {
503                    0 => data.asks_0_price,
504                    1 => data.asks_1_price,
505                    2 => data.asks_2_price,
506                    3 => data.asks_3_price,
507                    4 => data.asks_4_price,
508                    5 => data.asks_5_price,
509                    6 => data.asks_6_price,
510                    7 => data.asks_7_price,
511                    8 => data.asks_8_price,
512                    9 => data.asks_9_price,
513                    _ => unreachable!("i is constrained to 0..10 by loop"),
514                },
515                match i {
516                    0 => data.asks_0_amount,
517                    1 => data.asks_1_amount,
518                    2 => data.asks_2_amount,
519                    3 => data.asks_3_amount,
520                    4 => data.asks_4_amount,
521                    5 => data.asks_5_amount,
522                    6 => data.asks_6_amount,
523                    7 => data.asks_7_amount,
524                    8 => data.asks_8_amount,
525                    9 => data.asks_9_amount,
526                    _ => unreachable!("i is constrained to 0..10 by loop"),
527                },
528                current_price_precision,
529                current_size_precision,
530            );
531            asks[i] = ask_order;
532            ask_counts[i] = ask_count;
533        }
534
535        let depth = OrderBookDepth10::new(
536            instrument_id,
537            bids,
538            asks,
539            bid_counts,
540            ask_counts,
541            flags,
542            sequence,
543            ts_event,
544            ts_init,
545        );
546
547        depths.push(depth);
548
549        if let Some(limit) = limit
550            && depths.len() >= limit
551        {
552            break;
553        }
554    }
555
556    Ok(depths)
557}
558
559/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
560/// automatically applying `GZip` decompression for files ending in ".gz".
561/// Load quote ticks from a CSV or gzipped CSV file.
562///
563/// # Errors
564///
565/// Returns an error if the file cannot be opened, read, or parsed as CSV.
566///
567/// # Panics
568///
569/// Panics if a record has invalid data or CSV parsing errors.
570pub fn load_quotes<P: AsRef<Path>>(
571    filepath: P,
572    price_precision: Option<u8>,
573    size_precision: Option<u8>,
574    instrument_id: Option<InstrumentId>,
575    limit: Option<usize>,
576) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
577    // Estimate capacity for Vec pre-allocation
578    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
579    let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
580
581    let mut current_price_precision = price_precision.unwrap_or(0);
582    let mut current_size_precision = size_precision.unwrap_or(0);
583    let mut reader = create_csv_reader(filepath)?;
584    let mut record = StringRecord::new();
585
586    while reader.read_record(&mut record)? {
587        let data: TardisQuoteRecord = record.deserialize(None)?;
588
589        if price_precision.is_none()
590            && let Some(bid_price) = data.bid_price
591        {
592            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
593            if inferred_price_precision > current_price_precision {
594                current_price_precision = inferred_price_precision;
595            }
596        }
597
598        if size_precision.is_none()
599            && let Some(bid_amount) = data.bid_amount
600        {
601            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
602            if inferred_size_precision > current_size_precision {
603                current_size_precision = inferred_size_precision;
604            }
605        }
606
607        let quote = parse_quote_record(
608            &data,
609            current_price_precision,
610            current_size_precision,
611            instrument_id,
612        );
613
614        quotes.push(quote);
615
616        if let Some(limit) = limit
617            && quotes.len() >= limit
618        {
619            break;
620        }
621    }
622
623    // Update all quotes to use the final (maximum) precision discovered
624    // This is done once at the end instead of on every precision change (O(n) vs O(n²))
625    update_quotes_precision(
626        &mut quotes,
627        price_precision,
628        size_precision,
629        current_price_precision,
630        current_size_precision,
631    );
632
633    Ok(quotes)
634}
635
636/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
637/// automatically applying `GZip` decompression for files ending in ".gz".
638/// Load trade ticks from a CSV or gzipped CSV file.
639///
640/// # Errors
641///
642/// Returns an error if the file cannot be opened, read, or parsed as CSV.
643///
644/// # Panics
645///
646/// Panics if a record has invalid trade size or CSV parsing errors.
647pub fn load_trades<P: AsRef<Path>>(
648    filepath: P,
649    price_precision: Option<u8>,
650    size_precision: Option<u8>,
651    instrument_id: Option<InstrumentId>,
652    limit: Option<usize>,
653) -> Result<Vec<TradeTick>, Box<dyn Error>> {
654    // Estimate capacity for Vec pre-allocation
655    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
656    let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
657
658    let mut current_price_precision = price_precision.unwrap_or(0);
659    let mut current_size_precision = size_precision.unwrap_or(0);
660    let mut reader = create_csv_reader(filepath)?;
661    let mut record = StringRecord::new();
662
663    while reader.read_record(&mut record)? {
664        let data: TardisTradeRecord = record.deserialize(None)?;
665
666        if price_precision.is_none() {
667            let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
668            if inferred_price_precision > current_price_precision {
669                current_price_precision = inferred_price_precision;
670            }
671        }
672
673        if size_precision.is_none() {
674            let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
675            if inferred_size_precision > current_size_precision {
676                current_size_precision = inferred_size_precision;
677            }
678        }
679
680        let size = Quantity::new_checked(data.amount, current_size_precision)?;
681
682        if size.is_positive() {
683            let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
684
685            trades.push(trade);
686
687            if let Some(limit) = limit
688                && trades.len() >= limit
689            {
690                break;
691            }
692        } else {
693            log::warn!("Skipping zero-sized trade: {data:?}");
694        }
695    }
696
697    // Update all trades to use the final (maximum) precision discovered
698    // This is done once at the end instead of on every precision change (O(n) vs O(n²))
699    update_trades_precision(
700        &mut trades,
701        price_precision,
702        size_precision,
703        current_price_precision,
704        current_size_precision,
705    );
706
707    Ok(trades)
708}
709
710/// Loads [`FundingRateUpdate`]s from a Tardis format derivative ticker CSV at the given `filepath`,
711/// automatically applying `GZip` decompression for files ending in ".gz".
712///
713/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
714/// fields from derivative ticker data to create funding rate updates.
715///
716/// # Errors
717///
718/// Returns an error if the file cannot be opened, read, or parsed as CSV.
719pub fn load_funding_rates<P: AsRef<Path>>(
720    filepath: P,
721    instrument_id: Option<InstrumentId>,
722    limit: Option<usize>,
723) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
724    // Estimate capacity for Vec pre-allocation
725    let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
726    let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
727
728    let mut reader = create_csv_reader(filepath)?;
729    let mut record = StringRecord::new();
730
731    while reader.read_record(&mut record)? {
732        let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
733
734        // Parse to funding rate update (returns None if no funding data)
735        if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
736            funding_rates.push(funding_rate);
737
738            if let Some(limit) = limit
739                && funding_rates.len() >= limit
740            {
741                break;
742            }
743        }
744    }
745
746    Ok(funding_rates)
747}
748
749#[cfg(test)]
750mod tests {
751    use std::{fs, fs::File, sync::Arc};
752
753    use nautilus_core::paths::get_test_data_path as get_test_data_root;
754    use nautilus_model::{
755        enums::{AggressorSide, BookAction},
756        identifiers::{InstrumentId, TradeId},
757        types::Price,
758    };
759    use nautilus_serialization::arrow::{ArrowSchemaProvider, EncodeToRecordBatch};
760    use nautilus_testkit::common::{
761        get_tardis_binance_snapshot5_path, get_tardis_binance_snapshot25_path,
762        get_tardis_bitmex_trades_path, get_tardis_deribit_book_l2_path,
763        get_tardis_huobi_quotes_path,
764    };
765    use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
766    use rstest::*;
767
768    use super::*;
769    use crate::{common::testing::get_test_data_path, parse::parse_price};
770
771    #[rstest]
772    #[case(0.0, 0)]
773    #[case(42.0, 0)]
774    #[case(0.1, 1)]
775    #[case(0.25, 2)]
776    #[case(123.0001, 4)]
777    #[case(-42.987654321,       9)]
778    #[case(1.234_567_890_123, 12)]
779    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
780        assert_eq!(infer_precision(input), expected);
781    }
782
783    #[rstest]
784    pub fn test_dynamic_precision_inference() {
785        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
786binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
787binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
788binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
789binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
790binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
791
792        let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
793        std::fs::write(&temp_file, csv_data).unwrap();
794
795        let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
796
797        // 5 data rows + 1 CLEAR delta at start (first row is snapshot)
798        assert_eq!(deltas.len(), 6);
799
800        // Skip the CLEAR delta at index 0
801        for (i, delta) in deltas.iter().skip(1).enumerate() {
802            assert_eq!(
803                delta.order.price.precision, 4,
804                "Price precision should be 4 for delta {i}",
805            );
806            assert_eq!(
807                delta.order.size.precision, 1,
808                "Size precision should be 1 for delta {i}",
809            );
810        }
811
812        // Test exact values to ensure retroactive precision updates work correctly
813        // Index 0 is CLEAR, data starts at index 1
814        assert_eq!(deltas[0].action, BookAction::Clear);
815
816        assert_eq!(deltas[1].order.price, parse_price(50000.0, 4));
817        assert_eq!(deltas[1].order.size, Quantity::new(1.0, 1));
818
819        assert_eq!(deltas[2].order.price, parse_price(49999.5, 4));
820        assert_eq!(deltas[2].order.size, Quantity::new(2.0, 1));
821
822        assert_eq!(deltas[3].order.price, parse_price(50000.12, 4));
823        assert_eq!(deltas[3].order.size, Quantity::new(1.5, 1));
824
825        assert_eq!(deltas[4].order.price, parse_price(49999.123, 4));
826        assert_eq!(deltas[4].order.size, Quantity::new(3.0, 1));
827
828        assert_eq!(deltas[5].order.price, parse_price(50000.1234, 4));
829        assert_eq!(deltas[5].order.size, Quantity::new(0.5, 1));
830
831        assert_eq!(
832            deltas[1].order.price.precision,
833            deltas[5].order.price.precision
834        );
835        assert_eq!(
836            deltas[1].order.size.precision,
837            deltas[3].order.size.precision
838        );
839
840        std::fs::remove_file(&temp_file).ok();
841    }
842
843    #[rstest]
844    #[case(Some(1), Some(0))] // Explicit precisions
845    #[case(None, None)] // Inferred precisions
846    pub fn test_read_deltas(
847        #[case] price_precision: Option<u8>,
848        #[case] size_precision: Option<u8>,
849    ) {
850        let filepath = get_tardis_deribit_book_l2_path();
851        let deltas =
852            load_deltas(filepath, price_precision, size_precision, None, Some(100)).unwrap();
853
854        // 15 data rows + 1 CLEAR delta at start (first row is snapshot)
855        assert_eq!(deltas.len(), 16);
856
857        // Index 0 is CLEAR delta
858        assert_eq!(deltas[0].action, BookAction::Clear);
859
860        // Index 1 is first data delta
861        assert_eq!(
862            deltas[1].instrument_id,
863            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
864        );
865        assert_eq!(deltas[1].action, BookAction::Add);
866        assert_eq!(deltas[1].order.side, OrderSide::Sell);
867        assert_eq!(deltas[1].order.price, Price::from("6421.5"));
868        assert_eq!(deltas[1].order.size, Quantity::from("18640"));
869        assert_eq!(deltas[1].flags, 0);
870        assert_eq!(deltas[1].sequence, 0);
871        assert_eq!(deltas[1].ts_event, 1585699200245000000);
872        assert_eq!(deltas[1].ts_init, 1585699200355684000);
873    }
874
875    #[rstest]
876    #[case(Some(2), Some(3))] // Explicit precisions
877    #[case(None, None)] // Inferred precisions
878    pub fn test_read_depth10s_from_snapshot5(
879        #[case] price_precision: Option<u8>,
880        #[case] size_precision: Option<u8>,
881    ) {
882        let filepath = get_tardis_binance_snapshot5_path();
883        let depths =
884            load_depth10_from_snapshot5(filepath, price_precision, size_precision, None, Some(100))
885                .unwrap();
886
887        assert_eq!(depths.len(), 10);
888        assert_eq!(
889            depths[0].instrument_id,
890            InstrumentId::from("BTCUSDT.BINANCE")
891        );
892        assert_eq!(depths[0].bids.len(), 10);
893        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
894        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
895        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
896        assert_eq!(depths[0].bids[0].order_id, 0);
897        assert_eq!(depths[0].asks.len(), 10);
898        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
899        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
900        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
901        assert_eq!(depths[0].asks[0].order_id, 0);
902        assert_eq!(depths[0].bid_counts[0], 1);
903        assert_eq!(depths[0].ask_counts[0], 1);
904        // F_SNAPSHOT (32) | F_LAST (128) = 160
905        assert_eq!(
906            depths[0].flags,
907            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
908        );
909        assert_eq!(depths[0].ts_event, 1598918403696000000);
910        assert_eq!(depths[0].ts_init, 1598918403810979000);
911        assert_eq!(depths[0].sequence, 0);
912    }
913
914    #[rstest]
915    #[case(Some(2), Some(3))] // Explicit precisions
916    #[case(None, None)] // Inferred precisions
917    pub fn test_read_depth10s_from_snapshot25(
918        #[case] price_precision: Option<u8>,
919        #[case] size_precision: Option<u8>,
920    ) {
921        let filepath = get_tardis_binance_snapshot25_path();
922        let depths = load_depth10_from_snapshot25(
923            filepath,
924            price_precision,
925            size_precision,
926            None,
927            Some(100),
928        )
929        .unwrap();
930
931        assert_eq!(depths.len(), 10);
932        assert_eq!(
933            depths[0].instrument_id,
934            InstrumentId::from("BTCUSDT.BINANCE")
935        );
936        assert_eq!(depths[0].bids.len(), 10);
937        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
938        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
939        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
940        assert_eq!(depths[0].bids[0].order_id, 0);
941        assert_eq!(depths[0].asks.len(), 10);
942        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
943        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
944        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
945        assert_eq!(depths[0].asks[0].order_id, 0);
946        assert_eq!(depths[0].bid_counts[0], 1);
947        assert_eq!(depths[0].ask_counts[0], 1);
948        // F_SNAPSHOT (32) | F_LAST (128) = 160
949        assert_eq!(
950            depths[0].flags,
951            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
952        );
953        assert_eq!(depths[0].ts_event, 1598918403696000000);
954        assert_eq!(depths[0].ts_init, 1598918403810979000);
955        assert_eq!(depths[0].sequence, 0);
956    }
957
958    #[rstest]
959    #[case(Some(1), Some(0))] // Explicit precisions
960    #[case(None, None)] // Inferred precisions
961    pub fn test_read_quotes(
962        #[case] price_precision: Option<u8>,
963        #[case] size_precision: Option<u8>,
964    ) {
965        let filepath = get_tardis_huobi_quotes_path();
966        let quotes =
967            load_quotes(filepath, price_precision, size_precision, None, Some(100)).unwrap();
968
969        assert_eq!(quotes.len(), 10);
970        assert_eq!(
971            quotes[0].instrument_id,
972            InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
973        );
974        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
975        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
976        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
977        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
978        assert_eq!(quotes[0].ts_event, 1588291201099000000);
979        assert_eq!(quotes[0].ts_init, 1588291201234268000);
980    }
981
982    #[rstest]
983    #[case(Some(1), Some(0))] // Explicit precisions
984    #[case(None, None)] // Inferred precisions
985    pub fn test_read_trades(
986        #[case] price_precision: Option<u8>,
987        #[case] size_precision: Option<u8>,
988    ) {
989        let filepath = get_tardis_bitmex_trades_path();
990        let trades =
991            load_trades(filepath, price_precision, size_precision, None, Some(100)).unwrap();
992
993        assert_eq!(trades.len(), 10);
994        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
995        assert_eq!(trades[0].price, Price::from("8531.5"));
996        assert_eq!(trades[0].size, Quantity::from("2152"));
997        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
998        assert_eq!(
999            trades[0].trade_id,
1000            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1001        );
1002        assert_eq!(trades[0].ts_event, 1583020803145000000);
1003        assert_eq!(trades[0].ts_init, 1583020803307160000);
1004    }
1005
1006    #[rstest]
1007    pub fn test_load_trades_with_zero_sized_trade() {
1008        // Create test CSV data with one zero-sized trade that should be skipped
1009        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1010binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1011binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1012binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1013binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1014
1015        let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
1016        std::fs::write(&temp_file, csv_data).unwrap();
1017
1018        let trades = load_trades(
1019            &temp_file,
1020            Some(4),
1021            Some(1),
1022            None,
1023            None, // No limit, load all
1024        )
1025        .unwrap();
1026
1027        // Should have 3 trades (zero-sized trade skipped)
1028        assert_eq!(trades.len(), 3);
1029
1030        // Verify the correct trades were loaded (not the zero-sized one)
1031        assert_eq!(trades[0].size, Quantity::from("1.0"));
1032        assert_eq!(trades[1].size, Quantity::from("1.5"));
1033        assert_eq!(trades[2].size, Quantity::from("3.0"));
1034
1035        // Verify trade IDs to confirm correct trades were loaded
1036        assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1037        assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1038        assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1039
1040        std::fs::remove_file(&temp_file).ok();
1041    }
1042
1043    #[rstest]
1044    pub fn test_load_trades_from_local_file() {
1045        let filepath = get_test_data_path("csv/trades_1.csv");
1046        let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1047        assert_eq!(trades.len(), 2);
1048        assert_eq!(trades[0].price, Price::from("8531.5"));
1049        assert_eq!(trades[1].size, Quantity::from("1000"));
1050    }
1051
1052    #[rstest]
1053    pub fn test_load_deltas_from_local_file() {
1054        let filepath = get_test_data_path("csv/deltas_1.csv");
1055        let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1056
1057        // 2 data rows + 1 CLEAR delta at start (first row is snapshot)
1058        assert_eq!(deltas.len(), 3);
1059        assert_eq!(deltas[0].action, BookAction::Clear);
1060        assert_eq!(deltas[1].order.price, Price::from("6421.5"));
1061        assert_eq!(deltas[2].order.size, Quantity::from("10000"));
1062    }
1063
1064    #[rstest]
1065    fn test_load_depth10_from_snapshot5_comprehensive() {
1066        let filepath = get_tardis_binance_snapshot5_path();
1067        let depths = load_depth10_from_snapshot5(&filepath, None, None, None, Some(100)).unwrap();
1068
1069        assert_eq!(depths.len(), 10);
1070
1071        let first = &depths[0];
1072        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1073        assert_eq!(first.bids.len(), 10);
1074        assert_eq!(first.asks.len(), 10);
1075
1076        // Check all bid levels (first 5 from data, rest empty)
1077        assert_eq!(first.bids[0].price, Price::from("11657.07"));
1078        assert_eq!(first.bids[0].size, Quantity::from("10.896"));
1079        assert_eq!(first.bids[0].side, OrderSide::Buy);
1080
1081        assert_eq!(first.bids[1].price, Price::from("11656.97"));
1082        assert_eq!(first.bids[1].size, Quantity::from("0.2"));
1083        assert_eq!(first.bids[1].side, OrderSide::Buy);
1084
1085        assert_eq!(first.bids[2].price, Price::from("11655.78"));
1086        assert_eq!(first.bids[2].size, Quantity::from("0.2"));
1087        assert_eq!(first.bids[2].side, OrderSide::Buy);
1088
1089        assert_eq!(first.bids[3].price, Price::from("11655.77"));
1090        assert_eq!(first.bids[3].size, Quantity::from("0.98"));
1091        assert_eq!(first.bids[3].side, OrderSide::Buy);
1092
1093        assert_eq!(first.bids[4].price, Price::from("11655.68"));
1094        assert_eq!(first.bids[4].size, Quantity::from("0.111"));
1095        assert_eq!(first.bids[4].side, OrderSide::Buy);
1096
1097        // Empty levels
1098        for i in 5..10 {
1099            assert_eq!(first.bids[i].price.raw, 0);
1100            assert_eq!(first.bids[i].size.raw, 0);
1101            assert_eq!(first.bids[i].side, OrderSide::NoOrderSide);
1102        }
1103
1104        // Check all ask levels (first 5 from data, rest empty)
1105        assert_eq!(first.asks[0].price, Price::from("11657.08"));
1106        assert_eq!(first.asks[0].size, Quantity::from("1.714"));
1107        assert_eq!(first.asks[0].side, OrderSide::Sell);
1108
1109        assert_eq!(first.asks[1].price, Price::from("11657.54"));
1110        assert_eq!(first.asks[1].size, Quantity::from("5.4"));
1111        assert_eq!(first.asks[1].side, OrderSide::Sell);
1112
1113        assert_eq!(first.asks[2].price, Price::from("11657.56"));
1114        assert_eq!(first.asks[2].size, Quantity::from("0.238"));
1115        assert_eq!(first.asks[2].side, OrderSide::Sell);
1116
1117        assert_eq!(first.asks[3].price, Price::from("11657.61"));
1118        assert_eq!(first.asks[3].size, Quantity::from("0.077"));
1119        assert_eq!(first.asks[3].side, OrderSide::Sell);
1120
1121        assert_eq!(first.asks[4].price, Price::from("11657.92"));
1122        assert_eq!(first.asks[4].size, Quantity::from("0.918"));
1123        assert_eq!(first.asks[4].side, OrderSide::Sell);
1124
1125        // Empty levels
1126        for i in 5..10 {
1127            assert_eq!(first.asks[i].price.raw, 0);
1128            assert_eq!(first.asks[i].size.raw, 0);
1129            assert_eq!(first.asks[i].side, OrderSide::NoOrderSide);
1130        }
1131
1132        // Logical checks: bid prices should decrease
1133        for i in 1..5 {
1134            assert!(
1135                first.bids[i].price < first.bids[i - 1].price,
1136                "Bid price at level {} should be less than level {}",
1137                i,
1138                i - 1
1139            );
1140        }
1141
1142        // Logical checks: ask prices should increase
1143        for i in 1..5 {
1144            assert!(
1145                first.asks[i].price > first.asks[i - 1].price,
1146                "Ask price at level {} should be greater than level {}",
1147                i,
1148                i - 1
1149            );
1150        }
1151
1152        // Logical check: spread should be positive
1153        assert!(
1154            first.asks[0].price > first.bids[0].price,
1155            "Best ask should be greater than best bid"
1156        );
1157
1158        // Check counts
1159        for i in 0..5 {
1160            assert_eq!(first.bid_counts[i], 1);
1161            assert_eq!(first.ask_counts[i], 1);
1162        }
1163        for i in 5..10 {
1164            assert_eq!(first.bid_counts[i], 0);
1165            assert_eq!(first.ask_counts[i], 0);
1166        }
1167
1168        // Check metadata - F_SNAPSHOT (32) | F_LAST (128) = 160
1169        assert_eq!(
1170            first.flags,
1171            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1172        );
1173        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1174        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1175        assert_eq!(first.sequence, 0);
1176    }
1177
1178    #[rstest]
1179    fn test_load_depth10_from_snapshot25_comprehensive() {
1180        let filepath = get_tardis_binance_snapshot25_path();
1181        let depths = load_depth10_from_snapshot25(&filepath, None, None, None, Some(100)).unwrap();
1182
1183        assert_eq!(depths.len(), 10);
1184
1185        let first = &depths[0];
1186        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1187        assert_eq!(first.bids.len(), 10);
1188        assert_eq!(first.asks.len(), 10);
1189
1190        // Check all 10 bid levels from snapshot25
1191        let expected_bids = vec![
1192            ("11657.07", "10.896"),
1193            ("11656.97", "0.2"),
1194            ("11655.78", "0.2"),
1195            ("11655.77", "0.98"),
1196            ("11655.68", "0.111"),
1197            ("11655.66", "0.077"),
1198            ("11655.57", "0.34"),
1199            ("11655.48", "0.4"),
1200            ("11655.26", "1.185"),
1201            ("11654.86", "0.195"),
1202        ];
1203
1204        for (i, (price, size)) in expected_bids.iter().enumerate() {
1205            assert_eq!(first.bids[i].price, Price::from(*price));
1206            assert_eq!(first.bids[i].size, Quantity::from(*size));
1207            assert_eq!(first.bids[i].side, OrderSide::Buy);
1208        }
1209
1210        // Check all 10 ask levels from snapshot25
1211        let expected_asks = vec![
1212            ("11657.08", "1.714"),
1213            ("11657.54", "5.4"),
1214            ("11657.56", "0.238"),
1215            ("11657.61", "0.077"),
1216            ("11657.92", "0.918"),
1217            ("11658.09", "1.015"),
1218            ("11658.12", "0.665"),
1219            ("11658.19", "0.583"),
1220            ("11658.28", "0.255"),
1221            ("11658.29", "0.656"),
1222        ];
1223
1224        for (i, (price, size)) in expected_asks.iter().enumerate() {
1225            assert_eq!(first.asks[i].price, Price::from(*price));
1226            assert_eq!(first.asks[i].size, Quantity::from(*size));
1227            assert_eq!(first.asks[i].side, OrderSide::Sell);
1228        }
1229
1230        // Logical checks: bid prices should strictly decrease
1231        for i in 1..10 {
1232            assert!(
1233                first.bids[i].price < first.bids[i - 1].price,
1234                "Bid price at level {} ({}) should be less than level {} ({})",
1235                i,
1236                first.bids[i].price,
1237                i - 1,
1238                first.bids[i - 1].price
1239            );
1240        }
1241
1242        // Logical checks: ask prices should strictly increase
1243        for i in 1..10 {
1244            assert!(
1245                first.asks[i].price > first.asks[i - 1].price,
1246                "Ask price at level {} ({}) should be greater than level {} ({})",
1247                i,
1248                first.asks[i].price,
1249                i - 1,
1250                first.asks[i - 1].price
1251            );
1252        }
1253
1254        // Logical check: spread should be positive
1255        assert!(
1256            first.asks[0].price > first.bids[0].price,
1257            "Best ask ({}) should be greater than best bid ({})",
1258            first.asks[0].price,
1259            first.bids[0].price
1260        );
1261
1262        // Check counts (all should be 1 for snapshot data)
1263        for i in 0..10 {
1264            assert_eq!(first.bid_counts[i], 1);
1265            assert_eq!(first.ask_counts[i], 1);
1266        }
1267
1268        // Check metadata - F_SNAPSHOT (32) | F_LAST (128) = 160
1269        assert_eq!(
1270            first.flags,
1271            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1272        );
1273        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1274        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1275        assert_eq!(first.sequence, 0);
1276    }
1277
1278    #[rstest]
1279    fn test_snapshot_csv_field_order_interleaved() {
1280        // This test verifies that the CSV structs correctly handle the interleaved
1281        // asks/bids field ordering from Tardis CSV files
1282
1283        let csv_data = "exchange,symbol,timestamp,local_timestamp,\
1284asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,\
1285asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,\
1286asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,\
1287asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,\
1288asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1289binance-futures,BTCUSDT,1000000,2000000,\
1290100.5,1.0,100.4,2.0,\
1291100.6,1.1,100.3,2.1,\
1292100.7,1.2,100.2,2.2,\
1293100.8,1.3,100.1,2.3,\
1294100.9,1.4,100.0,2.4";
1295
1296        let temp_file = std::env::temp_dir().join("test_interleaved_snapshot5.csv");
1297        std::fs::write(&temp_file, csv_data).unwrap();
1298
1299        let depths = load_depth10_from_snapshot5(&temp_file, None, None, None, Some(1)).unwrap();
1300        assert_eq!(depths.len(), 1);
1301
1302        let depth = &depths[0];
1303
1304        // Verify bids are correctly parsed (should be decreasing)
1305        assert_eq!(depth.bids[0].price, Price::from("100.4"));
1306        assert_eq!(depth.bids[1].price, Price::from("100.3"));
1307        assert_eq!(depth.bids[2].price, Price::from("100.2"));
1308        assert_eq!(depth.bids[3].price, Price::from("100.1"));
1309        assert_eq!(depth.bids[4].price, Price::from("100.0"));
1310
1311        // Verify asks are correctly parsed (should be increasing)
1312        assert_eq!(depth.asks[0].price, Price::from("100.5"));
1313        assert_eq!(depth.asks[1].price, Price::from("100.6"));
1314        assert_eq!(depth.asks[2].price, Price::from("100.7"));
1315        assert_eq!(depth.asks[3].price, Price::from("100.8"));
1316        assert_eq!(depth.asks[4].price, Price::from("100.9"));
1317
1318        // Verify sizes
1319        assert_eq!(depth.bids[0].size, Quantity::from("2.0"));
1320        assert_eq!(depth.asks[0].size, Quantity::from("1.0"));
1321
1322        std::fs::remove_file(temp_file).unwrap();
1323    }
1324
1325    #[rstest]
1326    fn test_load_deltas_limit_includes_clear_deltas() {
1327        // Test that limit counts total emitted deltas (including CLEARs)
1328        // When limit=5, we should get exactly 5 deltas: 1 CLEAR + 4 data deltas
1329        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1330binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1331binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1332binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1333binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1334binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1335binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,2.0
1336binance-futures,BTCUSDT,1640995205000000,1640995205100000,false,bid,49997.0,0.5";
1337
1338        let temp_file = std::env::temp_dir().join("test_load_deltas_limit.csv");
1339        std::fs::write(&temp_file, csv_data).unwrap();
1340
1341        // Load with limit=5 (should emit exactly 5 deltas including CLEAR)
1342        let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(5)).unwrap();
1343
1344        // Should have exactly 5 deltas: 1 CLEAR + 4 data deltas
1345        assert_eq!(deltas.len(), 5);
1346        assert_eq!(deltas[0].action, BookAction::Clear);
1347        assert_eq!(deltas[1].action, BookAction::Add);
1348        assert_eq!(deltas[2].action, BookAction::Add);
1349        assert_eq!(deltas[3].action, BookAction::Update);
1350        assert_eq!(deltas[4].action, BookAction::Update);
1351
1352        // Verify the last delta is from the 4th CSV record (49999.0 bid)
1353        assert_eq!(deltas[3].order.price, parse_price(49999.0, 1));
1354
1355        std::fs::remove_file(&temp_file).ok();
1356    }
1357
1358    #[rstest]
1359    fn test_load_deltas_limit_stops_at_clear() {
1360        // Test that limit=1 with snapshot data returns only the CLEAR delta
1361        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1362binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1363binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
1364
1365        let temp_file = std::env::temp_dir().join("test_load_deltas_limit_stops_at_clear.csv");
1366        std::fs::write(&temp_file, csv_data).unwrap();
1367
1368        // Load with limit=1 should only get the CLEAR delta
1369        let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(1)).unwrap();
1370
1371        assert_eq!(deltas.len(), 1);
1372        assert_eq!(deltas[0].action, BookAction::Clear);
1373
1374        std::fs::remove_file(&temp_file).ok();
1375    }
1376
1377    #[rstest]
1378    fn test_load_deltas_limit_with_mid_day_snapshot() {
1379        // Test limit behavior when there's a mid-day snapshot
1380        // The limit counts total emitted deltas including CLEARs
1381        let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
1382        let deltas = load_deltas(filepath, Some(1), Some(1), None, Some(5)).unwrap();
1383
1384        // With limit=5, we get exactly 5 deltas
1385        // First snapshot inserts CLEAR, then we get 4 more data deltas
1386        assert_eq!(deltas.len(), 5);
1387        assert_eq!(deltas[0].action, BookAction::Clear);
1388    }
1389
1390    // Curates the large Tardis Deribit CSV.gz into NautilusTrader Parquet format.
1391    // Run manually: `cargo test -p nautilus-tardis test_curate_deribit_deltas -- --ignored --nocapture`
1392    #[rstest]
1393    #[ignore = "one-time dataset curation, not for routine CI"]
1394    fn test_curate_deribit_deltas() {
1395        let csv_path = get_test_data_root()
1396            .join("large")
1397            .join("tardis_deribit_incremental_book_L2_2020-04-01_BTC-PERPETUAL.csv.gz");
1398
1399        let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
1400        let parquet_path = "/tmp/tardis_BTC-PERPETUAL.DERIBIT_2020-04-01_deltas.parquet";
1401
1402        println!("Loading deltas from {}", csv_path.display());
1403        let deltas = load_deltas(&csv_path, None, None, Some(instrument_id), None).unwrap();
1404        let count = deltas.len();
1405        println!("Loaded {count} deltas");
1406
1407        let sample = deltas
1408            .iter()
1409            .find(|d| d.order.price.precision > 0)
1410            .expect("Should have at least one non-CLEAR delta");
1411        let price_precision = sample.order.price.precision;
1412        let size_precision = sample.order.size.precision;
1413        println!("Precision: price={price_precision}, size={size_precision}");
1414
1415        // Write in chunks to avoid stack overflow on large batches
1416        let metadata =
1417            OrderBookDelta::get_metadata(&instrument_id, price_precision, size_precision);
1418        let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
1419
1420        println!("Writing Parquet to {parquet_path}");
1421        let file = File::create(parquet_path).unwrap();
1422        let zstd_level = parquet::basic::ZstdLevel::try_new(3).unwrap();
1423        let props = WriterProperties::builder()
1424            .set_compression(parquet::basic::Compression::ZSTD(zstd_level))
1425            .set_max_row_group_size(1_000_000)
1426            .build();
1427        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
1428
1429        let chunk_size = 1_000_000;
1430        for (i, chunk) in deltas.chunks(chunk_size).enumerate() {
1431            println!("  Encoding chunk {} ({} records)...", i + 1, chunk.len());
1432            let batch = OrderBookDelta::encode_batch(&metadata, chunk).unwrap();
1433            writer.write(&batch).unwrap();
1434        }
1435        writer.close().unwrap();
1436
1437        let file_size = fs::metadata(parquet_path).unwrap().len();
1438        println!("\n=== CURATION COMPLETE ===");
1439        println!("Records: {count}");
1440        println!("Price precision: {price_precision}");
1441        println!("Size precision: {size_precision}");
1442        println!(
1443            "File size: {} bytes ({:.1} MB)",
1444            file_size,
1445            file_size as f64 / 1_048_576.0
1446        );
1447        println!("Output: {parquet_path}");
1448        println!("\nNext steps:");
1449        println!("  sha256sum {parquet_path}");
1450    }
1451}