nautilus_tardis/csv/
load.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{
22        DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick,
24    },
25    enums::{OrderSide, RecordFlag},
26    identifiers::InstrumentId,
27    types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31    csv::{
32        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
33        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
34        record::{
35            TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
36            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
37        },
38    },
39    parse::{parse_instrument_id, parse_timestamp},
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43    if explicit.is_some() {
44        return false;
45    }
46
47    let inferred = infer_precision(value).min(FIXED_PRECISION);
48    if inferred > *current {
49        *current = inferred;
50        true
51    } else {
52        false
53    }
54}
55
56fn update_deltas_precision(
57    deltas: &mut [OrderBookDelta],
58    price_precision: Option<u8>,
59    size_precision: Option<u8>,
60    current_price_precision: u8,
61    current_size_precision: u8,
62) {
63    for delta in deltas {
64        if price_precision.is_none() {
65            delta.order.price.precision = current_price_precision;
66        }
67        if size_precision.is_none() {
68            delta.order.size.precision = current_size_precision;
69        }
70    }
71}
72
73fn update_quotes_precision(
74    quotes: &mut [QuoteTick],
75    price_precision: Option<u8>,
76    size_precision: Option<u8>,
77    current_price_precision: u8,
78    current_size_precision: u8,
79) {
80    for quote in quotes {
81        if price_precision.is_none() {
82            quote.bid_price.precision = current_price_precision;
83            quote.ask_price.precision = current_price_precision;
84        }
85        if size_precision.is_none() {
86            quote.bid_size.precision = current_size_precision;
87            quote.ask_size.precision = current_size_precision;
88        }
89    }
90}
91
92fn update_trades_precision(
93    trades: &mut [TradeTick],
94    price_precision: Option<u8>,
95    size_precision: Option<u8>,
96    current_price_precision: u8,
97    current_size_precision: u8,
98) {
99    for trade in trades {
100        if price_precision.is_none() {
101            trade.price.precision = current_price_precision;
102        }
103        if size_precision.is_none() {
104            trade.size.precision = current_size_precision;
105        }
106    }
107}
108
109/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
110/// automatically applying `GZip` decompression for files ending in ".gz".
111/// Load order book delta records from a CSV or gzipped CSV file.
112///
113/// # Errors
114///
115/// Returns an error if the file cannot be opened, read, or parsed as CSV.
116///
117/// # Panics
118///
119/// Panics if a CSV record has a zero size for a non-delete action or if data conversion fails.
120pub fn load_deltas<P: AsRef<Path>>(
121    filepath: P,
122    price_precision: Option<u8>,
123    size_precision: Option<u8>,
124    instrument_id: Option<InstrumentId>,
125    limit: Option<usize>,
126) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
127    // Estimate capacity for Vec pre-allocation
128    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
129    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
130
131    let mut current_price_precision = price_precision.unwrap_or(0);
132    let mut current_size_precision = size_precision.unwrap_or(0);
133    let mut last_ts_event = UnixNanos::default();
134
135    let mut reader = create_csv_reader(filepath)?;
136    let mut record = StringRecord::new();
137
138    while reader.read_record(&mut record)? {
139        let data: TardisBookUpdateRecord = record.deserialize(None)?;
140
141        // Update precisions dynamically if not explicitly set
142        let price_updated =
143            update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
144        let size_updated =
145            update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
146
147        // If precision increased, update all previous deltas
148        if price_updated || size_updated {
149            update_deltas_precision(
150                &mut deltas,
151                price_precision,
152                size_precision,
153                current_price_precision,
154                current_size_precision,
155            );
156        }
157
158        let delta = parse_delta_record(
159            &data,
160            current_price_precision,
161            current_size_precision,
162            instrument_id,
163        );
164
165        // Check if timestamp is different from last timestamp
166        let ts_event = delta.ts_event;
167        if last_ts_event != ts_event
168            && let Some(last_delta) = deltas.last_mut()
169        {
170            // Set previous delta flags as F_LAST
171            last_delta.flags = RecordFlag::F_LAST.value();
172        }
173
174        last_ts_event = ts_event;
175
176        deltas.push(delta);
177
178        if let Some(limit) = limit
179            && deltas.len() >= limit
180        {
181            break;
182        }
183    }
184
185    // Set F_LAST flag for final delta
186    if let Some(last_delta) = deltas.last_mut() {
187        last_delta.flags = RecordFlag::F_LAST.value();
188    }
189
190    Ok(deltas)
191}
192
193/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
194/// automatically applying `GZip` decompression for files ending in ".gz".
195/// Load order book depth-10 snapshots (5-level) from a CSV or gzipped CSV file.
196///
197/// # Errors
198///
199/// Returns an error if the file cannot be opened, read, or parsed as CSV.
200///
201/// # Panics
202///
203/// Panics if a record level cannot be parsed to depth-10.
204pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
205    filepath: P,
206    price_precision: Option<u8>,
207    size_precision: Option<u8>,
208    instrument_id: Option<InstrumentId>,
209    limit: Option<usize>,
210) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
211    // Estimate capacity for Vec pre-allocation
212    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
213    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
214
215    let mut current_price_precision = price_precision.unwrap_or(0);
216    let mut current_size_precision = size_precision.unwrap_or(0);
217
218    let mut reader = create_csv_reader(filepath)?;
219    let mut record = StringRecord::new();
220
221    while reader.read_record(&mut record)? {
222        let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
223
224        // Update precisions dynamically if not explicitly set
225        let mut precision_updated = false;
226
227        if price_precision.is_none()
228            && let Some(bid_price) = data.bids_0_price
229        {
230            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
231            if inferred_price_precision > current_price_precision {
232                current_price_precision = inferred_price_precision;
233                precision_updated = true;
234            }
235        }
236
237        if size_precision.is_none()
238            && let Some(bid_amount) = data.bids_0_amount
239        {
240            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
241            if inferred_size_precision > current_size_precision {
242                current_size_precision = inferred_size_precision;
243                precision_updated = true;
244            }
245        }
246
247        // If precision increased, update all previous depths
248        if precision_updated {
249            for depth in &mut depths {
250                for i in 0..DEPTH10_LEN {
251                    if price_precision.is_none() {
252                        depth.bids[i].price.precision = current_price_precision;
253                        depth.asks[i].price.precision = current_price_precision;
254                    }
255                    if size_precision.is_none() {
256                        depth.bids[i].size.precision = current_size_precision;
257                        depth.asks[i].size.precision = current_size_precision;
258                    }
259                }
260            }
261        }
262
263        let instrument_id = match &instrument_id {
264            Some(id) => *id,
265            None => parse_instrument_id(&data.exchange, data.symbol),
266        };
267        let flags = RecordFlag::F_LAST.value();
268        let sequence = 0; // Sequence not available
269        let ts_event = parse_timestamp(data.timestamp);
270        let ts_init = parse_timestamp(data.local_timestamp);
271
272        // Initialize empty arrays
273        let mut bids = [NULL_ORDER; DEPTH10_LEN];
274        let mut asks = [NULL_ORDER; DEPTH10_LEN];
275        let mut bid_counts = [0u32; DEPTH10_LEN];
276        let mut ask_counts = [0u32; DEPTH10_LEN];
277
278        for i in 0..=4 {
279            // Create bids
280            let (bid_order, bid_count) = create_book_order(
281                OrderSide::Buy,
282                match i {
283                    0 => data.bids_0_price,
284                    1 => data.bids_1_price,
285                    2 => data.bids_2_price,
286                    3 => data.bids_3_price,
287                    4 => data.bids_4_price,
288                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
289                },
290                match i {
291                    0 => data.bids_0_amount,
292                    1 => data.bids_1_amount,
293                    2 => data.bids_2_amount,
294                    3 => data.bids_3_amount,
295                    4 => data.bids_4_amount,
296                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
297                },
298                current_price_precision,
299                current_size_precision,
300            );
301            bids[i] = bid_order;
302            bid_counts[i] = bid_count;
303
304            // Create asks
305            let (ask_order, ask_count) = create_book_order(
306                OrderSide::Sell,
307                match i {
308                    0 => data.asks_0_price,
309                    1 => data.asks_1_price,
310                    2 => data.asks_2_price,
311                    3 => data.asks_3_price,
312                    4 => data.asks_4_price,
313                    _ => None, // Unreachable, but for safety
314                },
315                match i {
316                    0 => data.asks_0_amount,
317                    1 => data.asks_1_amount,
318                    2 => data.asks_2_amount,
319                    3 => data.asks_3_amount,
320                    4 => data.asks_4_amount,
321                    _ => None, // Unreachable, but for safety
322                },
323                current_price_precision,
324                current_size_precision,
325            );
326            asks[i] = ask_order;
327            ask_counts[i] = ask_count;
328        }
329
330        let depth = OrderBookDepth10::new(
331            instrument_id,
332            bids,
333            asks,
334            bid_counts,
335            ask_counts,
336            flags,
337            sequence,
338            ts_event,
339            ts_init,
340        );
341
342        depths.push(depth);
343
344        if let Some(limit) = limit
345            && depths.len() >= limit
346        {
347            break;
348        }
349    }
350
351    Ok(depths)
352}
353
354/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
355/// automatically applying `GZip` decompression for files ending in ".gz".
356/// Load order book depth-10 snapshots (25-level) from a CSV or gzipped CSV file.
357///
358/// # Errors
359///
360/// Returns an error if the file cannot be opened, read, or parsed as CSV.
361///
362/// # Panics
363///
364/// Panics if a record level cannot be parsed to depth-10.
365pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
366    filepath: P,
367    price_precision: Option<u8>,
368    size_precision: Option<u8>,
369    instrument_id: Option<InstrumentId>,
370    limit: Option<usize>,
371) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
372    // Estimate capacity for Vec pre-allocation
373    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
374    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
375
376    let mut current_price_precision = price_precision.unwrap_or(0);
377    let mut current_size_precision = size_precision.unwrap_or(0);
378    let mut reader = create_csv_reader(filepath)?;
379    let mut record = StringRecord::new();
380
381    while reader.read_record(&mut record)? {
382        let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
383
384        // Update precisions dynamically if not explicitly set
385        let mut precision_updated = false;
386
387        if price_precision.is_none()
388            && let Some(bid_price) = data.bids_0_price
389        {
390            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
391            if inferred_price_precision > current_price_precision {
392                current_price_precision = inferred_price_precision;
393                precision_updated = true;
394            }
395        }
396
397        if size_precision.is_none()
398            && let Some(bid_amount) = data.bids_0_amount
399        {
400            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
401            if inferred_size_precision > current_size_precision {
402                current_size_precision = inferred_size_precision;
403                precision_updated = true;
404            }
405        }
406
407        // If precision increased, update all previous depths
408        if precision_updated {
409            for depth in &mut depths {
410                for i in 0..DEPTH10_LEN {
411                    if price_precision.is_none() {
412                        depth.bids[i].price.precision = current_price_precision;
413                        depth.asks[i].price.precision = current_price_precision;
414                    }
415                    if size_precision.is_none() {
416                        depth.bids[i].size.precision = current_size_precision;
417                        depth.asks[i].size.precision = current_size_precision;
418                    }
419                }
420            }
421        }
422
423        let instrument_id = match &instrument_id {
424            Some(id) => *id,
425            None => parse_instrument_id(&data.exchange, data.symbol),
426        };
427        let flags = RecordFlag::F_LAST.value();
428        let sequence = 0; // Sequence not available
429        let ts_event = parse_timestamp(data.timestamp);
430        let ts_init = parse_timestamp(data.local_timestamp);
431
432        // Initialize empty arrays for the first 10 levels only
433        let mut bids = [NULL_ORDER; DEPTH10_LEN];
434        let mut asks = [NULL_ORDER; DEPTH10_LEN];
435        let mut bid_counts = [0u32; DEPTH10_LEN];
436        let mut ask_counts = [0u32; DEPTH10_LEN];
437
438        // Fill only the first 10 levels from the 25-level record
439        for i in 0..DEPTH10_LEN {
440            // Create bids
441            let (bid_order, bid_count) = create_book_order(
442                OrderSide::Buy,
443                match i {
444                    0 => data.bids_0_price,
445                    1 => data.bids_1_price,
446                    2 => data.bids_2_price,
447                    3 => data.bids_3_price,
448                    4 => data.bids_4_price,
449                    5 => data.bids_5_price,
450                    6 => data.bids_6_price,
451                    7 => data.bids_7_price,
452                    8 => data.bids_8_price,
453                    9 => data.bids_9_price,
454                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
455                },
456                match i {
457                    0 => data.bids_0_amount,
458                    1 => data.bids_1_amount,
459                    2 => data.bids_2_amount,
460                    3 => data.bids_3_amount,
461                    4 => data.bids_4_amount,
462                    5 => data.bids_5_amount,
463                    6 => data.bids_6_amount,
464                    7 => data.bids_7_amount,
465                    8 => data.bids_8_amount,
466                    9 => data.bids_9_amount,
467                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
468                },
469                current_price_precision,
470                current_size_precision,
471            );
472            bids[i] = bid_order;
473            bid_counts[i] = bid_count;
474
475            // Create asks
476            let (ask_order, ask_count) = create_book_order(
477                OrderSide::Sell,
478                match i {
479                    0 => data.asks_0_price,
480                    1 => data.asks_1_price,
481                    2 => data.asks_2_price,
482                    3 => data.asks_3_price,
483                    4 => data.asks_4_price,
484                    5 => data.asks_5_price,
485                    6 => data.asks_6_price,
486                    7 => data.asks_7_price,
487                    8 => data.asks_8_price,
488                    9 => data.asks_9_price,
489                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
490                },
491                match i {
492                    0 => data.asks_0_amount,
493                    1 => data.asks_1_amount,
494                    2 => data.asks_2_amount,
495                    3 => data.asks_3_amount,
496                    4 => data.asks_4_amount,
497                    5 => data.asks_5_amount,
498                    6 => data.asks_6_amount,
499                    7 => data.asks_7_amount,
500                    8 => data.asks_8_amount,
501                    9 => data.asks_9_amount,
502                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
503                },
504                current_price_precision,
505                current_size_precision,
506            );
507            asks[i] = ask_order;
508            ask_counts[i] = ask_count;
509        }
510
511        let depth = OrderBookDepth10::new(
512            instrument_id,
513            bids,
514            asks,
515            bid_counts,
516            ask_counts,
517            flags,
518            sequence,
519            ts_event,
520            ts_init,
521        );
522
523        depths.push(depth);
524
525        if let Some(limit) = limit
526            && depths.len() >= limit
527        {
528            break;
529        }
530    }
531
532    Ok(depths)
533}
534
535/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
536/// automatically applying `GZip` decompression for files ending in ".gz".
537/// Load quote ticks from a CSV or gzipped CSV file.
538///
539/// # Errors
540///
541/// Returns an error if the file cannot be opened, read, or parsed as CSV.
542///
543/// # Panics
544///
545/// Panics if a record has invalid data or CSV parsing errors.
546pub fn load_quotes<P: AsRef<Path>>(
547    filepath: P,
548    price_precision: Option<u8>,
549    size_precision: Option<u8>,
550    instrument_id: Option<InstrumentId>,
551    limit: Option<usize>,
552) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
553    // Estimate capacity for Vec pre-allocation
554    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
555    let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
556
557    let mut current_price_precision = price_precision.unwrap_or(0);
558    let mut current_size_precision = size_precision.unwrap_or(0);
559    let mut reader = create_csv_reader(filepath)?;
560    let mut record = StringRecord::new();
561
562    while reader.read_record(&mut record)? {
563        let data: TardisQuoteRecord = record.deserialize(None)?;
564
565        // Update precisions dynamically if not explicitly set
566        let mut precision_updated = false;
567
568        if price_precision.is_none()
569            && let Some(bid_price) = data.bid_price
570        {
571            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
572            if inferred_price_precision > current_price_precision {
573                current_price_precision = inferred_price_precision;
574                precision_updated = true;
575            }
576        }
577
578        if size_precision.is_none()
579            && let Some(bid_amount) = data.bid_amount
580        {
581            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
582            if inferred_size_precision > current_size_precision {
583                current_size_precision = inferred_size_precision;
584                precision_updated = true;
585            }
586        }
587
588        // If precision increased, update all previous quotes
589        if precision_updated {
590            update_quotes_precision(
591                &mut quotes,
592                price_precision,
593                size_precision,
594                current_price_precision,
595                current_size_precision,
596            );
597        }
598
599        let quote = parse_quote_record(
600            &data,
601            current_price_precision,
602            current_size_precision,
603            instrument_id,
604        );
605
606        quotes.push(quote);
607
608        if let Some(limit) = limit
609            && quotes.len() >= limit
610        {
611            break;
612        }
613    }
614
615    Ok(quotes)
616}
617
618/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
619/// automatically applying `GZip` decompression for files ending in ".gz".
620/// Load trade ticks from a CSV or gzipped CSV file.
621///
622/// # Errors
623///
624/// Returns an error if the file cannot be opened, read, or parsed as CSV.
625///
626/// # Panics
627///
628/// Panics if a record has invalid trade size or CSV parsing errors.
629pub fn load_trades<P: AsRef<Path>>(
630    filepath: P,
631    price_precision: Option<u8>,
632    size_precision: Option<u8>,
633    instrument_id: Option<InstrumentId>,
634    limit: Option<usize>,
635) -> Result<Vec<TradeTick>, Box<dyn Error>> {
636    // Estimate capacity for Vec pre-allocation
637    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
638    let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
639
640    let mut current_price_precision = price_precision.unwrap_or(0);
641    let mut current_size_precision = size_precision.unwrap_or(0);
642    let mut reader = create_csv_reader(filepath)?;
643    let mut record = StringRecord::new();
644
645    while reader.read_record(&mut record)? {
646        let data: TardisTradeRecord = record.deserialize(None)?;
647
648        // Update precisions dynamically if not explicitly set
649        let mut precision_updated = false;
650
651        if price_precision.is_none() {
652            let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
653            if inferred_price_precision > current_price_precision {
654                current_price_precision = inferred_price_precision;
655                precision_updated = true;
656            }
657        }
658
659        if size_precision.is_none() {
660            let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
661            if inferred_size_precision > current_size_precision {
662                current_size_precision = inferred_size_precision;
663                precision_updated = true;
664            }
665        }
666
667        // If precision increased, update all previous trades
668        if precision_updated {
669            update_trades_precision(
670                &mut trades,
671                price_precision,
672                size_precision,
673                current_price_precision,
674                current_size_precision,
675            );
676        }
677
678        let size = Quantity::new_checked(data.amount, current_size_precision)?;
679
680        if size.is_positive() {
681            let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
682
683            trades.push(trade);
684
685            if let Some(limit) = limit
686                && trades.len() >= limit
687            {
688                break;
689            }
690        } else {
691            log::warn!("Skipping zero-sized trade: {data:?}");
692        }
693    }
694
695    Ok(trades)
696}
697
698/// Loads [`FundingRateUpdate`]s from a Tardis format derivative ticker CSV at the given `filepath`,
699/// automatically applying `GZip` decompression for files ending in ".gz".
700///
701/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
702/// fields from derivative ticker data to create funding rate updates.
703///
704/// # Errors
705///
706/// Returns an error if the file cannot be opened, read, or parsed as CSV.
707pub fn load_funding_rates<P: AsRef<Path>>(
708    filepath: P,
709    instrument_id: Option<InstrumentId>,
710    limit: Option<usize>,
711) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
712    // Estimate capacity for Vec pre-allocation
713    let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
714    let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
715
716    let mut reader = create_csv_reader(filepath)?;
717    let mut record = StringRecord::new();
718
719    while reader.read_record(&mut record)? {
720        let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
721
722        // Parse to funding rate update (returns None if no funding data)
723        if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
724            funding_rates.push(funding_rate);
725
726            if let Some(limit) = limit
727                && funding_rates.len() >= limit
728            {
729                break;
730            }
731        }
732    }
733
734    Ok(funding_rates)
735}
736
737////////////////////////////////////////////////////////////////////////////////
738// Tests
739////////////////////////////////////////////////////////////////////////////////
740#[cfg(test)]
741mod tests {
742    use nautilus_model::{
743        enums::{AggressorSide, BookAction},
744        identifiers::{InstrumentId, TradeId},
745        types::{Price, Quantity},
746    };
747    use nautilus_testkit::common::{
748        ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
749        ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
750        ensure_data_exists_tardis_huobi_quotes,
751    };
752    use rstest::*;
753
754    use super::*;
755    use crate::{parse::parse_price, tests::get_test_data_path};
756
757    #[rstest]
758    #[case(0.0, 0)]
759    #[case(42.0, 0)]
760    #[case(0.1, 1)]
761    #[case(0.25, 2)]
762    #[case(123.0001, 4)]
763    #[case(-42.987654321,       9)]
764    #[case(1.234_567_890_123, 12)]
765    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
766        assert_eq!(infer_precision(input), expected);
767    }
768
769    #[rstest]
770    pub fn test_dynamic_precision_inference() {
771        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
772binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
773binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
774binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
775binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
776binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
777
778        let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
779        std::fs::write(&temp_file, csv_data).unwrap();
780
781        let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
782
783        assert_eq!(deltas.len(), 5);
784
785        for (i, delta) in deltas.iter().enumerate() {
786            assert_eq!(
787                delta.order.price.precision, 4,
788                "Price precision should be 4 for delta {i}",
789            );
790            assert_eq!(
791                delta.order.size.precision, 1,
792                "Size precision should be 1 for delta {i}",
793            );
794        }
795
796        // Test exact values to ensure retroactive precision updates work correctly
797        assert_eq!(deltas[0].order.price, parse_price(50000.0, 4));
798        assert_eq!(deltas[0].order.size, Quantity::new(1.0, 1));
799
800        assert_eq!(deltas[1].order.price, parse_price(49999.5, 4));
801        assert_eq!(deltas[1].order.size, Quantity::new(2.0, 1));
802
803        assert_eq!(deltas[2].order.price, parse_price(50000.12, 4));
804        assert_eq!(deltas[2].order.size, Quantity::new(1.5, 1));
805
806        assert_eq!(deltas[3].order.price, parse_price(49999.123, 4));
807        assert_eq!(deltas[3].order.size, Quantity::new(3.0, 1));
808
809        assert_eq!(deltas[4].order.price, parse_price(50000.1234, 4));
810        assert_eq!(deltas[4].order.size, Quantity::new(0.5, 1));
811
812        assert_eq!(
813            deltas[0].order.price.precision,
814            deltas[4].order.price.precision
815        );
816        assert_eq!(
817            deltas[0].order.size.precision,
818            deltas[2].order.size.precision
819        );
820
821        std::fs::remove_file(&temp_file).ok();
822    }
823
824    // TODO: Flaky in CI, potentially from syncing large test data files from cache
825    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
826    #[rstest]
827    #[case(Some(1), Some(0))] // Explicit precisions
828    #[case(None, None)] // Inferred precisions
829    pub fn test_read_deltas(
830        #[case] price_precision: Option<u8>,
831        #[case] size_precision: Option<u8>,
832    ) {
833        let filepath = ensure_data_exists_tardis_deribit_book_l2();
834        let deltas = load_deltas(
835            filepath,
836            price_precision,
837            size_precision,
838            None,
839            Some(10_000),
840        )
841        .unwrap();
842
843        assert_eq!(deltas.len(), 10_000);
844        assert_eq!(
845            deltas[0].instrument_id,
846            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
847        );
848        assert_eq!(deltas[0].action, BookAction::Add);
849        assert_eq!(deltas[0].order.side, OrderSide::Sell);
850        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
851        assert_eq!(deltas[0].order.size, Quantity::from("18640"));
852        assert_eq!(deltas[0].flags, 0);
853        assert_eq!(deltas[0].sequence, 0);
854        assert_eq!(deltas[0].ts_event, 1585699200245000000);
855        assert_eq!(deltas[0].ts_init, 1585699200355684000);
856    }
857
858    // TODO: Flaky in CI, potentially from syncing large test data files from cache
859    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
860    #[rstest]
861    #[case(Some(2), Some(3))] // Explicit precisions
862    #[case(None, None)] // Inferred precisions
863    pub fn test_read_depth10s_from_snapshot5(
864        #[case] price_precision: Option<u8>,
865        #[case] size_precision: Option<u8>,
866    ) {
867        let filepath = ensure_data_exists_tardis_binance_snapshot5();
868        let depths = load_depth10_from_snapshot5(
869            filepath,
870            price_precision,
871            size_precision,
872            None,
873            Some(10_000),
874        )
875        .unwrap();
876
877        assert_eq!(depths.len(), 10_000);
878        assert_eq!(
879            depths[0].instrument_id,
880            InstrumentId::from("BTCUSDT.BINANCE")
881        );
882        assert_eq!(depths[0].bids.len(), 10);
883        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
884        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
885        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
886        assert_eq!(depths[0].bids[0].order_id, 0);
887        assert_eq!(depths[0].asks.len(), 10);
888        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
889        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
890        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
891        assert_eq!(depths[0].asks[0].order_id, 0);
892        assert_eq!(depths[0].bid_counts[0], 1);
893        assert_eq!(depths[0].ask_counts[0], 1);
894        assert_eq!(depths[0].flags, 128);
895        assert_eq!(depths[0].ts_event, 1598918403696000000);
896        assert_eq!(depths[0].ts_init, 1598918403810979000);
897        assert_eq!(depths[0].sequence, 0);
898    }
899
900    // TODO: Flaky in CI, potentially from syncing large test data files from cache
901    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
902    #[rstest]
903    #[case(Some(2), Some(3))] // Explicit precisions
904    #[case(None, None)] // Inferred precisions
905    pub fn test_read_depth10s_from_snapshot25(
906        #[case] price_precision: Option<u8>,
907        #[case] size_precision: Option<u8>,
908    ) {
909        let filepath = ensure_data_exists_tardis_binance_snapshot25();
910        let depths = load_depth10_from_snapshot25(
911            filepath,
912            price_precision,
913            size_precision,
914            None,
915            Some(10_000),
916        )
917        .unwrap();
918
919        assert_eq!(depths.len(), 10_000);
920        assert_eq!(
921            depths[0].instrument_id,
922            InstrumentId::from("BTCUSDT.BINANCE")
923        );
924        assert_eq!(depths[0].bids.len(), 10);
925        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
926        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
927        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
928        assert_eq!(depths[0].bids[0].order_id, 0);
929        assert_eq!(depths[0].asks.len(), 10);
930        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
931        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
932        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
933        assert_eq!(depths[0].asks[0].order_id, 0);
934        assert_eq!(depths[0].bid_counts[0], 1);
935        assert_eq!(depths[0].ask_counts[0], 1);
936        assert_eq!(depths[0].flags, 128);
937        assert_eq!(depths[0].ts_event, 1598918403696000000);
938        assert_eq!(depths[0].ts_init, 1598918403810979000);
939        assert_eq!(depths[0].sequence, 0);
940    }
941
942    // TODO: Flaky in CI, potentially from syncing large test data files from cache
943    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
944    #[rstest]
945    #[case(Some(1), Some(0))] // Explicit precisions
946    #[case(None, None)] // Inferred precisions
947    pub fn test_read_quotes(
948        #[case] price_precision: Option<u8>,
949        #[case] size_precision: Option<u8>,
950    ) {
951        let filepath = ensure_data_exists_tardis_huobi_quotes();
952        let quotes = load_quotes(
953            filepath,
954            price_precision,
955            size_precision,
956            None,
957            Some(10_000),
958        )
959        .unwrap();
960
961        assert_eq!(quotes.len(), 10_000);
962        assert_eq!(
963            quotes[0].instrument_id,
964            InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
965        );
966        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
967        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
968        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
969        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
970        assert_eq!(quotes[0].ts_event, 1588291201099000000);
971        assert_eq!(quotes[0].ts_init, 1588291201234268000);
972    }
973
974    // TODO: Flaky in CI, potentially from syncing large test data files from cache
975    #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
976    #[rstest]
977    #[case(Some(1), Some(0))] // Explicit precisions
978    #[case(None, None)] // Inferred precisions
979    pub fn test_read_trades(
980        #[case] price_precision: Option<u8>,
981        #[case] size_precision: Option<u8>,
982    ) {
983        let filepath = ensure_data_exists_tardis_bitmex_trades();
984        let trades = load_trades(
985            filepath,
986            price_precision,
987            size_precision,
988            None,
989            Some(10_000),
990        )
991        .unwrap();
992
993        assert_eq!(trades.len(), 10_000);
994        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
995        assert_eq!(trades[0].price, Price::from("8531.5"));
996        assert_eq!(trades[0].size, Quantity::from("2152"));
997        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
998        assert_eq!(
999            trades[0].trade_id,
1000            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1001        );
1002        assert_eq!(trades[0].ts_event, 1583020803145000000);
1003        assert_eq!(trades[0].ts_init, 1583020803307160000);
1004    }
1005
1006    #[rstest]
1007    pub fn test_load_trades_with_zero_sized_trade() {
1008        // Create test CSV data with one zero-sized trade that should be skipped
1009        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1010binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1011binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1012binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1013binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1014
1015        let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
1016        std::fs::write(&temp_file, csv_data).unwrap();
1017
1018        let trades = load_trades(
1019            &temp_file,
1020            Some(4),
1021            Some(1),
1022            None,
1023            None, // No limit, load all
1024        )
1025        .unwrap();
1026
1027        // Should have 3 trades (zero-sized trade skipped)
1028        assert_eq!(trades.len(), 3);
1029
1030        // Verify the correct trades were loaded (not the zero-sized one)
1031        assert_eq!(trades[0].size, Quantity::from("1.0"));
1032        assert_eq!(trades[1].size, Quantity::from("1.5"));
1033        assert_eq!(trades[2].size, Quantity::from("3.0"));
1034
1035        // Verify trade IDs to confirm correct trades were loaded
1036        assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1037        assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1038        assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1039
1040        std::fs::remove_file(&temp_file).ok();
1041    }
1042
1043    #[rstest]
1044    pub fn test_load_trades_from_local_file() {
1045        let filepath = get_test_data_path("csv/trades_1.csv");
1046        let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1047        assert_eq!(trades.len(), 2);
1048        assert_eq!(trades[0].price, Price::from("8531.5"));
1049        assert_eq!(trades[1].size, Quantity::from("1000"));
1050    }
1051
1052    #[rstest]
1053    pub fn test_load_deltas_from_local_file() {
1054        let filepath = get_test_data_path("csv/deltas_1.csv");
1055        let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1056        assert_eq!(deltas.len(), 2);
1057        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
1058        assert_eq!(deltas[1].order.size, Quantity::from("10000"));
1059    }
1060}