1use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{
22 DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick,
24 },
25 enums::{OrderSide, RecordFlag},
26 identifiers::InstrumentId,
27 types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31 csv::{
32 create_book_order, create_csv_reader, infer_precision, parse_delta_record,
33 parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
34 record::{
35 TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
36 TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
37 },
38 },
39 parse::{parse_instrument_id, parse_timestamp},
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43 if explicit.is_some() {
44 return false;
45 }
46
47 let inferred = infer_precision(value).min(FIXED_PRECISION);
48 if inferred > *current {
49 *current = inferred;
50 true
51 } else {
52 false
53 }
54}
55
56fn update_deltas_precision(
57 deltas: &mut [OrderBookDelta],
58 price_precision: Option<u8>,
59 size_precision: Option<u8>,
60 current_price_precision: u8,
61 current_size_precision: u8,
62) {
63 for delta in deltas {
64 if price_precision.is_none() {
65 delta.order.price.precision = current_price_precision;
66 }
67 if size_precision.is_none() {
68 delta.order.size.precision = current_size_precision;
69 }
70 }
71}
72
73fn update_quotes_precision(
74 quotes: &mut [QuoteTick],
75 price_precision: Option<u8>,
76 size_precision: Option<u8>,
77 current_price_precision: u8,
78 current_size_precision: u8,
79) {
80 for quote in quotes {
81 if price_precision.is_none() {
82 quote.bid_price.precision = current_price_precision;
83 quote.ask_price.precision = current_price_precision;
84 }
85 if size_precision.is_none() {
86 quote.bid_size.precision = current_size_precision;
87 quote.ask_size.precision = current_size_precision;
88 }
89 }
90}
91
92fn update_trades_precision(
93 trades: &mut [TradeTick],
94 price_precision: Option<u8>,
95 size_precision: Option<u8>,
96 current_price_precision: u8,
97 current_size_precision: u8,
98) {
99 for trade in trades {
100 if price_precision.is_none() {
101 trade.price.precision = current_price_precision;
102 }
103 if size_precision.is_none() {
104 trade.size.precision = current_size_precision;
105 }
106 }
107}
108
109pub fn load_deltas<P: AsRef<Path>>(
121 filepath: P,
122 price_precision: Option<u8>,
123 size_precision: Option<u8>,
124 instrument_id: Option<InstrumentId>,
125 limit: Option<usize>,
126) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
127 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
129 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
130
131 let mut current_price_precision = price_precision.unwrap_or(0);
132 let mut current_size_precision = size_precision.unwrap_or(0);
133 let mut last_ts_event = UnixNanos::default();
134
135 let mut reader = create_csv_reader(filepath)?;
136 let mut record = StringRecord::new();
137
138 while reader.read_record(&mut record)? {
139 let data: TardisBookUpdateRecord = record.deserialize(None)?;
140
141 let price_updated =
143 update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
144 let size_updated =
145 update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
146
147 if price_updated || size_updated {
149 update_deltas_precision(
150 &mut deltas,
151 price_precision,
152 size_precision,
153 current_price_precision,
154 current_size_precision,
155 );
156 }
157
158 let delta = parse_delta_record(
159 &data,
160 current_price_precision,
161 current_size_precision,
162 instrument_id,
163 );
164
165 let ts_event = delta.ts_event;
167 if last_ts_event != ts_event
168 && let Some(last_delta) = deltas.last_mut()
169 {
170 last_delta.flags = RecordFlag::F_LAST.value();
172 }
173
174 last_ts_event = ts_event;
175
176 deltas.push(delta);
177
178 if let Some(limit) = limit
179 && deltas.len() >= limit
180 {
181 break;
182 }
183 }
184
185 if let Some(last_delta) = deltas.last_mut() {
187 last_delta.flags = RecordFlag::F_LAST.value();
188 }
189
190 Ok(deltas)
191}
192
193pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
205 filepath: P,
206 price_precision: Option<u8>,
207 size_precision: Option<u8>,
208 instrument_id: Option<InstrumentId>,
209 limit: Option<usize>,
210) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
211 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
213 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
214
215 let mut current_price_precision = price_precision.unwrap_or(0);
216 let mut current_size_precision = size_precision.unwrap_or(0);
217
218 let mut reader = create_csv_reader(filepath)?;
219 let mut record = StringRecord::new();
220
221 while reader.read_record(&mut record)? {
222 let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
223
224 let mut precision_updated = false;
226
227 if price_precision.is_none()
228 && let Some(bid_price) = data.bids_0_price
229 {
230 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
231 if inferred_price_precision > current_price_precision {
232 current_price_precision = inferred_price_precision;
233 precision_updated = true;
234 }
235 }
236
237 if size_precision.is_none()
238 && let Some(bid_amount) = data.bids_0_amount
239 {
240 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
241 if inferred_size_precision > current_size_precision {
242 current_size_precision = inferred_size_precision;
243 precision_updated = true;
244 }
245 }
246
247 if precision_updated {
249 for depth in &mut depths {
250 for i in 0..DEPTH10_LEN {
251 if price_precision.is_none() {
252 depth.bids[i].price.precision = current_price_precision;
253 depth.asks[i].price.precision = current_price_precision;
254 }
255 if size_precision.is_none() {
256 depth.bids[i].size.precision = current_size_precision;
257 depth.asks[i].size.precision = current_size_precision;
258 }
259 }
260 }
261 }
262
263 let instrument_id = match &instrument_id {
264 Some(id) => *id,
265 None => parse_instrument_id(&data.exchange, data.symbol),
266 };
267 let flags = RecordFlag::F_LAST.value();
268 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
270 let ts_init = parse_timestamp(data.local_timestamp);
271
272 let mut bids = [NULL_ORDER; DEPTH10_LEN];
274 let mut asks = [NULL_ORDER; DEPTH10_LEN];
275 let mut bid_counts = [0u32; DEPTH10_LEN];
276 let mut ask_counts = [0u32; DEPTH10_LEN];
277
278 for i in 0..=4 {
279 let (bid_order, bid_count) = create_book_order(
281 OrderSide::Buy,
282 match i {
283 0 => data.bids_0_price,
284 1 => data.bids_1_price,
285 2 => data.bids_2_price,
286 3 => data.bids_3_price,
287 4 => data.bids_4_price,
288 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
289 },
290 match i {
291 0 => data.bids_0_amount,
292 1 => data.bids_1_amount,
293 2 => data.bids_2_amount,
294 3 => data.bids_3_amount,
295 4 => data.bids_4_amount,
296 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
297 },
298 current_price_precision,
299 current_size_precision,
300 );
301 bids[i] = bid_order;
302 bid_counts[i] = bid_count;
303
304 let (ask_order, ask_count) = create_book_order(
306 OrderSide::Sell,
307 match i {
308 0 => data.asks_0_price,
309 1 => data.asks_1_price,
310 2 => data.asks_2_price,
311 3 => data.asks_3_price,
312 4 => data.asks_4_price,
313 _ => None, },
315 match i {
316 0 => data.asks_0_amount,
317 1 => data.asks_1_amount,
318 2 => data.asks_2_amount,
319 3 => data.asks_3_amount,
320 4 => data.asks_4_amount,
321 _ => None, },
323 current_price_precision,
324 current_size_precision,
325 );
326 asks[i] = ask_order;
327 ask_counts[i] = ask_count;
328 }
329
330 let depth = OrderBookDepth10::new(
331 instrument_id,
332 bids,
333 asks,
334 bid_counts,
335 ask_counts,
336 flags,
337 sequence,
338 ts_event,
339 ts_init,
340 );
341
342 depths.push(depth);
343
344 if let Some(limit) = limit
345 && depths.len() >= limit
346 {
347 break;
348 }
349 }
350
351 Ok(depths)
352}
353
354pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
366 filepath: P,
367 price_precision: Option<u8>,
368 size_precision: Option<u8>,
369 instrument_id: Option<InstrumentId>,
370 limit: Option<usize>,
371) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
372 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
374 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
375
376 let mut current_price_precision = price_precision.unwrap_or(0);
377 let mut current_size_precision = size_precision.unwrap_or(0);
378 let mut reader = create_csv_reader(filepath)?;
379 let mut record = StringRecord::new();
380
381 while reader.read_record(&mut record)? {
382 let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
383
384 let mut precision_updated = false;
386
387 if price_precision.is_none()
388 && let Some(bid_price) = data.bids_0_price
389 {
390 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
391 if inferred_price_precision > current_price_precision {
392 current_price_precision = inferred_price_precision;
393 precision_updated = true;
394 }
395 }
396
397 if size_precision.is_none()
398 && let Some(bid_amount) = data.bids_0_amount
399 {
400 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
401 if inferred_size_precision > current_size_precision {
402 current_size_precision = inferred_size_precision;
403 precision_updated = true;
404 }
405 }
406
407 if precision_updated {
409 for depth in &mut depths {
410 for i in 0..DEPTH10_LEN {
411 if price_precision.is_none() {
412 depth.bids[i].price.precision = current_price_precision;
413 depth.asks[i].price.precision = current_price_precision;
414 }
415 if size_precision.is_none() {
416 depth.bids[i].size.precision = current_size_precision;
417 depth.asks[i].size.precision = current_size_precision;
418 }
419 }
420 }
421 }
422
423 let instrument_id = match &instrument_id {
424 Some(id) => *id,
425 None => parse_instrument_id(&data.exchange, data.symbol),
426 };
427 let flags = RecordFlag::F_LAST.value();
428 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
430 let ts_init = parse_timestamp(data.local_timestamp);
431
432 let mut bids = [NULL_ORDER; DEPTH10_LEN];
434 let mut asks = [NULL_ORDER; DEPTH10_LEN];
435 let mut bid_counts = [0u32; DEPTH10_LEN];
436 let mut ask_counts = [0u32; DEPTH10_LEN];
437
438 for i in 0..DEPTH10_LEN {
440 let (bid_order, bid_count) = create_book_order(
442 OrderSide::Buy,
443 match i {
444 0 => data.bids_0_price,
445 1 => data.bids_1_price,
446 2 => data.bids_2_price,
447 3 => data.bids_3_price,
448 4 => data.bids_4_price,
449 5 => data.bids_5_price,
450 6 => data.bids_6_price,
451 7 => data.bids_7_price,
452 8 => data.bids_8_price,
453 9 => data.bids_9_price,
454 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
455 },
456 match i {
457 0 => data.bids_0_amount,
458 1 => data.bids_1_amount,
459 2 => data.bids_2_amount,
460 3 => data.bids_3_amount,
461 4 => data.bids_4_amount,
462 5 => data.bids_5_amount,
463 6 => data.bids_6_amount,
464 7 => data.bids_7_amount,
465 8 => data.bids_8_amount,
466 9 => data.bids_9_amount,
467 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
468 },
469 current_price_precision,
470 current_size_precision,
471 );
472 bids[i] = bid_order;
473 bid_counts[i] = bid_count;
474
475 let (ask_order, ask_count) = create_book_order(
477 OrderSide::Sell,
478 match i {
479 0 => data.asks_0_price,
480 1 => data.asks_1_price,
481 2 => data.asks_2_price,
482 3 => data.asks_3_price,
483 4 => data.asks_4_price,
484 5 => data.asks_5_price,
485 6 => data.asks_6_price,
486 7 => data.asks_7_price,
487 8 => data.asks_8_price,
488 9 => data.asks_9_price,
489 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
490 },
491 match i {
492 0 => data.asks_0_amount,
493 1 => data.asks_1_amount,
494 2 => data.asks_2_amount,
495 3 => data.asks_3_amount,
496 4 => data.asks_4_amount,
497 5 => data.asks_5_amount,
498 6 => data.asks_6_amount,
499 7 => data.asks_7_amount,
500 8 => data.asks_8_amount,
501 9 => data.asks_9_amount,
502 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
503 },
504 current_price_precision,
505 current_size_precision,
506 );
507 asks[i] = ask_order;
508 ask_counts[i] = ask_count;
509 }
510
511 let depth = OrderBookDepth10::new(
512 instrument_id,
513 bids,
514 asks,
515 bid_counts,
516 ask_counts,
517 flags,
518 sequence,
519 ts_event,
520 ts_init,
521 );
522
523 depths.push(depth);
524
525 if let Some(limit) = limit
526 && depths.len() >= limit
527 {
528 break;
529 }
530 }
531
532 Ok(depths)
533}
534
535pub fn load_quotes<P: AsRef<Path>>(
547 filepath: P,
548 price_precision: Option<u8>,
549 size_precision: Option<u8>,
550 instrument_id: Option<InstrumentId>,
551 limit: Option<usize>,
552) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
553 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
555 let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
556
557 let mut current_price_precision = price_precision.unwrap_or(0);
558 let mut current_size_precision = size_precision.unwrap_or(0);
559 let mut reader = create_csv_reader(filepath)?;
560 let mut record = StringRecord::new();
561
562 while reader.read_record(&mut record)? {
563 let data: TardisQuoteRecord = record.deserialize(None)?;
564
565 let mut precision_updated = false;
567
568 if price_precision.is_none()
569 && let Some(bid_price) = data.bid_price
570 {
571 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
572 if inferred_price_precision > current_price_precision {
573 current_price_precision = inferred_price_precision;
574 precision_updated = true;
575 }
576 }
577
578 if size_precision.is_none()
579 && let Some(bid_amount) = data.bid_amount
580 {
581 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
582 if inferred_size_precision > current_size_precision {
583 current_size_precision = inferred_size_precision;
584 precision_updated = true;
585 }
586 }
587
588 if precision_updated {
590 update_quotes_precision(
591 &mut quotes,
592 price_precision,
593 size_precision,
594 current_price_precision,
595 current_size_precision,
596 );
597 }
598
599 let quote = parse_quote_record(
600 &data,
601 current_price_precision,
602 current_size_precision,
603 instrument_id,
604 );
605
606 quotes.push(quote);
607
608 if let Some(limit) = limit
609 && quotes.len() >= limit
610 {
611 break;
612 }
613 }
614
615 Ok(quotes)
616}
617
618pub fn load_trades<P: AsRef<Path>>(
630 filepath: P,
631 price_precision: Option<u8>,
632 size_precision: Option<u8>,
633 instrument_id: Option<InstrumentId>,
634 limit: Option<usize>,
635) -> Result<Vec<TradeTick>, Box<dyn Error>> {
636 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
638 let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
639
640 let mut current_price_precision = price_precision.unwrap_or(0);
641 let mut current_size_precision = size_precision.unwrap_or(0);
642 let mut reader = create_csv_reader(filepath)?;
643 let mut record = StringRecord::new();
644
645 while reader.read_record(&mut record)? {
646 let data: TardisTradeRecord = record.deserialize(None)?;
647
648 let mut precision_updated = false;
650
651 if price_precision.is_none() {
652 let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
653 if inferred_price_precision > current_price_precision {
654 current_price_precision = inferred_price_precision;
655 precision_updated = true;
656 }
657 }
658
659 if size_precision.is_none() {
660 let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
661 if inferred_size_precision > current_size_precision {
662 current_size_precision = inferred_size_precision;
663 precision_updated = true;
664 }
665 }
666
667 if precision_updated {
669 update_trades_precision(
670 &mut trades,
671 price_precision,
672 size_precision,
673 current_price_precision,
674 current_size_precision,
675 );
676 }
677
678 let size = Quantity::new_checked(data.amount, current_size_precision)?;
679
680 if size.is_positive() {
681 let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
682
683 trades.push(trade);
684
685 if let Some(limit) = limit
686 && trades.len() >= limit
687 {
688 break;
689 }
690 } else {
691 log::warn!("Skipping zero-sized trade: {data:?}");
692 }
693 }
694
695 Ok(trades)
696}
697
698pub fn load_funding_rates<P: AsRef<Path>>(
708 filepath: P,
709 instrument_id: Option<InstrumentId>,
710 limit: Option<usize>,
711) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
712 let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
714 let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
715
716 let mut reader = create_csv_reader(filepath)?;
717 let mut record = StringRecord::new();
718
719 while reader.read_record(&mut record)? {
720 let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
721
722 if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
724 funding_rates.push(funding_rate);
725
726 if let Some(limit) = limit
727 && funding_rates.len() >= limit
728 {
729 break;
730 }
731 }
732 }
733
734 Ok(funding_rates)
735}
736
737#[cfg(test)]
741mod tests {
742 use nautilus_model::{
743 enums::{AggressorSide, BookAction},
744 identifiers::{InstrumentId, TradeId},
745 types::{Price, Quantity},
746 };
747 use nautilus_testkit::common::{
748 ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
749 ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
750 ensure_data_exists_tardis_huobi_quotes,
751 };
752 use rstest::*;
753
754 use super::*;
755 use crate::{parse::parse_price, tests::get_test_data_path};
756
757 #[rstest]
758 #[case(0.0, 0)]
759 #[case(42.0, 0)]
760 #[case(0.1, 1)]
761 #[case(0.25, 2)]
762 #[case(123.0001, 4)]
763 #[case(-42.987654321, 9)]
764 #[case(1.234_567_890_123, 12)]
765 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
766 assert_eq!(infer_precision(input), expected);
767 }
768
769 #[rstest]
770 pub fn test_dynamic_precision_inference() {
771 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
772binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
773binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
774binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
775binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
776binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
777
778 let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
779 std::fs::write(&temp_file, csv_data).unwrap();
780
781 let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
782
783 assert_eq!(deltas.len(), 5);
784
785 for (i, delta) in deltas.iter().enumerate() {
786 assert_eq!(
787 delta.order.price.precision, 4,
788 "Price precision should be 4 for delta {i}",
789 );
790 assert_eq!(
791 delta.order.size.precision, 1,
792 "Size precision should be 1 for delta {i}",
793 );
794 }
795
796 assert_eq!(deltas[0].order.price, parse_price(50000.0, 4));
798 assert_eq!(deltas[0].order.size, Quantity::new(1.0, 1));
799
800 assert_eq!(deltas[1].order.price, parse_price(49999.5, 4));
801 assert_eq!(deltas[1].order.size, Quantity::new(2.0, 1));
802
803 assert_eq!(deltas[2].order.price, parse_price(50000.12, 4));
804 assert_eq!(deltas[2].order.size, Quantity::new(1.5, 1));
805
806 assert_eq!(deltas[3].order.price, parse_price(49999.123, 4));
807 assert_eq!(deltas[3].order.size, Quantity::new(3.0, 1));
808
809 assert_eq!(deltas[4].order.price, parse_price(50000.1234, 4));
810 assert_eq!(deltas[4].order.size, Quantity::new(0.5, 1));
811
812 assert_eq!(
813 deltas[0].order.price.precision,
814 deltas[4].order.price.precision
815 );
816 assert_eq!(
817 deltas[0].order.size.precision,
818 deltas[2].order.size.precision
819 );
820
821 std::fs::remove_file(&temp_file).ok();
822 }
823
824 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
826 #[rstest]
827 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_deltas(
830 #[case] price_precision: Option<u8>,
831 #[case] size_precision: Option<u8>,
832 ) {
833 let filepath = ensure_data_exists_tardis_deribit_book_l2();
834 let deltas = load_deltas(
835 filepath,
836 price_precision,
837 size_precision,
838 None,
839 Some(10_000),
840 )
841 .unwrap();
842
843 assert_eq!(deltas.len(), 10_000);
844 assert_eq!(
845 deltas[0].instrument_id,
846 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
847 );
848 assert_eq!(deltas[0].action, BookAction::Add);
849 assert_eq!(deltas[0].order.side, OrderSide::Sell);
850 assert_eq!(deltas[0].order.price, Price::from("6421.5"));
851 assert_eq!(deltas[0].order.size, Quantity::from("18640"));
852 assert_eq!(deltas[0].flags, 0);
853 assert_eq!(deltas[0].sequence, 0);
854 assert_eq!(deltas[0].ts_event, 1585699200245000000);
855 assert_eq!(deltas[0].ts_init, 1585699200355684000);
856 }
857
858 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
860 #[rstest]
861 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot5(
864 #[case] price_precision: Option<u8>,
865 #[case] size_precision: Option<u8>,
866 ) {
867 let filepath = ensure_data_exists_tardis_binance_snapshot5();
868 let depths = load_depth10_from_snapshot5(
869 filepath,
870 price_precision,
871 size_precision,
872 None,
873 Some(10_000),
874 )
875 .unwrap();
876
877 assert_eq!(depths.len(), 10_000);
878 assert_eq!(
879 depths[0].instrument_id,
880 InstrumentId::from("BTCUSDT.BINANCE")
881 );
882 assert_eq!(depths[0].bids.len(), 10);
883 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
884 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
885 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
886 assert_eq!(depths[0].bids[0].order_id, 0);
887 assert_eq!(depths[0].asks.len(), 10);
888 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
889 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
890 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
891 assert_eq!(depths[0].asks[0].order_id, 0);
892 assert_eq!(depths[0].bid_counts[0], 1);
893 assert_eq!(depths[0].ask_counts[0], 1);
894 assert_eq!(depths[0].flags, 128);
895 assert_eq!(depths[0].ts_event, 1598918403696000000);
896 assert_eq!(depths[0].ts_init, 1598918403810979000);
897 assert_eq!(depths[0].sequence, 0);
898 }
899
900 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
902 #[rstest]
903 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot25(
906 #[case] price_precision: Option<u8>,
907 #[case] size_precision: Option<u8>,
908 ) {
909 let filepath = ensure_data_exists_tardis_binance_snapshot25();
910 let depths = load_depth10_from_snapshot25(
911 filepath,
912 price_precision,
913 size_precision,
914 None,
915 Some(10_000),
916 )
917 .unwrap();
918
919 assert_eq!(depths.len(), 10_000);
920 assert_eq!(
921 depths[0].instrument_id,
922 InstrumentId::from("BTCUSDT.BINANCE")
923 );
924 assert_eq!(depths[0].bids.len(), 10);
925 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
926 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
927 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
928 assert_eq!(depths[0].bids[0].order_id, 0);
929 assert_eq!(depths[0].asks.len(), 10);
930 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
931 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
932 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
933 assert_eq!(depths[0].asks[0].order_id, 0);
934 assert_eq!(depths[0].bid_counts[0], 1);
935 assert_eq!(depths[0].ask_counts[0], 1);
936 assert_eq!(depths[0].flags, 128);
937 assert_eq!(depths[0].ts_event, 1598918403696000000);
938 assert_eq!(depths[0].ts_init, 1598918403810979000);
939 assert_eq!(depths[0].sequence, 0);
940 }
941
942 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
944 #[rstest]
945 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_quotes(
948 #[case] price_precision: Option<u8>,
949 #[case] size_precision: Option<u8>,
950 ) {
951 let filepath = ensure_data_exists_tardis_huobi_quotes();
952 let quotes = load_quotes(
953 filepath,
954 price_precision,
955 size_precision,
956 None,
957 Some(10_000),
958 )
959 .unwrap();
960
961 assert_eq!(quotes.len(), 10_000);
962 assert_eq!(
963 quotes[0].instrument_id,
964 InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
965 );
966 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
967 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
968 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
969 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
970 assert_eq!(quotes[0].ts_event, 1588291201099000000);
971 assert_eq!(quotes[0].ts_init, 1588291201234268000);
972 }
973
974 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
976 #[rstest]
977 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_trades(
980 #[case] price_precision: Option<u8>,
981 #[case] size_precision: Option<u8>,
982 ) {
983 let filepath = ensure_data_exists_tardis_bitmex_trades();
984 let trades = load_trades(
985 filepath,
986 price_precision,
987 size_precision,
988 None,
989 Some(10_000),
990 )
991 .unwrap();
992
993 assert_eq!(trades.len(), 10_000);
994 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
995 assert_eq!(trades[0].price, Price::from("8531.5"));
996 assert_eq!(trades[0].size, Quantity::from("2152"));
997 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
998 assert_eq!(
999 trades[0].trade_id,
1000 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1001 );
1002 assert_eq!(trades[0].ts_event, 1583020803145000000);
1003 assert_eq!(trades[0].ts_init, 1583020803307160000);
1004 }
1005
1006 #[rstest]
1007 pub fn test_load_trades_with_zero_sized_trade() {
1008 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1010binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1011binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1012binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1013binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1014
1015 let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
1016 std::fs::write(&temp_file, csv_data).unwrap();
1017
1018 let trades = load_trades(
1019 &temp_file,
1020 Some(4),
1021 Some(1),
1022 None,
1023 None, )
1025 .unwrap();
1026
1027 assert_eq!(trades.len(), 3);
1029
1030 assert_eq!(trades[0].size, Quantity::from("1.0"));
1032 assert_eq!(trades[1].size, Quantity::from("1.5"));
1033 assert_eq!(trades[2].size, Quantity::from("3.0"));
1034
1035 assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1037 assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1038 assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1039
1040 std::fs::remove_file(&temp_file).ok();
1041 }
1042
1043 #[rstest]
1044 pub fn test_load_trades_from_local_file() {
1045 let filepath = get_test_data_path("csv/trades_1.csv");
1046 let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1047 assert_eq!(trades.len(), 2);
1048 assert_eq!(trades[0].price, Price::from("8531.5"));
1049 assert_eq!(trades[1].size, Quantity::from("1000"));
1050 }
1051
1052 #[rstest]
1053 pub fn test_load_deltas_from_local_file() {
1054 let filepath = get_test_data_path("csv/deltas_1.csv");
1055 let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1056 assert_eq!(deltas.len(), 2);
1057 assert_eq!(deltas[0].order.price, Price::from("6421.5"));
1058 assert_eq!(deltas[1].order.size, Quantity::from("10000"));
1059 }
1060
1061 #[rstest]
1062 fn test_load_depth10_from_snapshot5_comprehensive() {
1063 let filepath = ensure_data_exists_tardis_binance_snapshot5();
1064 let depths = load_depth10_from_snapshot5(&filepath, None, None, None, Some(100)).unwrap();
1065
1066 assert_eq!(depths.len(), 100);
1067
1068 let first = &depths[0];
1069 assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1070 assert_eq!(first.bids.len(), 10);
1071 assert_eq!(first.asks.len(), 10);
1072
1073 assert_eq!(first.bids[0].price, Price::from("11657.07"));
1075 assert_eq!(first.bids[0].size, Quantity::from("10.896"));
1076 assert_eq!(first.bids[0].side, OrderSide::Buy);
1077
1078 assert_eq!(first.bids[1].price, Price::from("11656.97"));
1079 assert_eq!(first.bids[1].size, Quantity::from("0.2"));
1080 assert_eq!(first.bids[1].side, OrderSide::Buy);
1081
1082 assert_eq!(first.bids[2].price, Price::from("11655.78"));
1083 assert_eq!(first.bids[2].size, Quantity::from("0.2"));
1084 assert_eq!(first.bids[2].side, OrderSide::Buy);
1085
1086 assert_eq!(first.bids[3].price, Price::from("11655.77"));
1087 assert_eq!(first.bids[3].size, Quantity::from("0.98"));
1088 assert_eq!(first.bids[3].side, OrderSide::Buy);
1089
1090 assert_eq!(first.bids[4].price, Price::from("11655.68"));
1091 assert_eq!(first.bids[4].size, Quantity::from("0.111"));
1092 assert_eq!(first.bids[4].side, OrderSide::Buy);
1093
1094 for i in 5..10 {
1096 assert_eq!(first.bids[i].price.raw, 0);
1097 assert_eq!(first.bids[i].size.raw, 0);
1098 assert_eq!(first.bids[i].side, OrderSide::NoOrderSide);
1099 }
1100
1101 assert_eq!(first.asks[0].price, Price::from("11657.08"));
1103 assert_eq!(first.asks[0].size, Quantity::from("1.714"));
1104 assert_eq!(first.asks[0].side, OrderSide::Sell);
1105
1106 assert_eq!(first.asks[1].price, Price::from("11657.54"));
1107 assert_eq!(first.asks[1].size, Quantity::from("5.4"));
1108 assert_eq!(first.asks[1].side, OrderSide::Sell);
1109
1110 assert_eq!(first.asks[2].price, Price::from("11657.56"));
1111 assert_eq!(first.asks[2].size, Quantity::from("0.238"));
1112 assert_eq!(first.asks[2].side, OrderSide::Sell);
1113
1114 assert_eq!(first.asks[3].price, Price::from("11657.61"));
1115 assert_eq!(first.asks[3].size, Quantity::from("0.077"));
1116 assert_eq!(first.asks[3].side, OrderSide::Sell);
1117
1118 assert_eq!(first.asks[4].price, Price::from("11657.92"));
1119 assert_eq!(first.asks[4].size, Quantity::from("0.918"));
1120 assert_eq!(first.asks[4].side, OrderSide::Sell);
1121
1122 for i in 5..10 {
1124 assert_eq!(first.asks[i].price.raw, 0);
1125 assert_eq!(first.asks[i].size.raw, 0);
1126 assert_eq!(first.asks[i].side, OrderSide::NoOrderSide);
1127 }
1128
1129 for i in 1..5 {
1131 assert!(
1132 first.bids[i].price < first.bids[i - 1].price,
1133 "Bid price at level {} should be less than level {}",
1134 i,
1135 i - 1
1136 );
1137 }
1138
1139 for i in 1..5 {
1141 assert!(
1142 first.asks[i].price > first.asks[i - 1].price,
1143 "Ask price at level {} should be greater than level {}",
1144 i,
1145 i - 1
1146 );
1147 }
1148
1149 assert!(
1151 first.asks[0].price > first.bids[0].price,
1152 "Best ask should be greater than best bid"
1153 );
1154
1155 for i in 0..5 {
1157 assert_eq!(first.bid_counts[i], 1);
1158 assert_eq!(first.ask_counts[i], 1);
1159 }
1160 for i in 5..10 {
1161 assert_eq!(first.bid_counts[i], 0);
1162 assert_eq!(first.ask_counts[i], 0);
1163 }
1164
1165 assert_eq!(first.flags, 128); assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1168 assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1169 assert_eq!(first.sequence, 0);
1170 }
1171
1172 #[rstest]
1173 fn test_load_depth10_from_snapshot25_comprehensive() {
1174 let filepath = ensure_data_exists_tardis_binance_snapshot25();
1175 let depths = load_depth10_from_snapshot25(&filepath, None, None, None, Some(100)).unwrap();
1176
1177 assert_eq!(depths.len(), 100);
1178
1179 let first = &depths[0];
1180 assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1181 assert_eq!(first.bids.len(), 10);
1182 assert_eq!(first.asks.len(), 10);
1183
1184 let expected_bids = vec![
1186 ("11657.07", "10.896"),
1187 ("11656.97", "0.2"),
1188 ("11655.78", "0.2"),
1189 ("11655.77", "0.98"),
1190 ("11655.68", "0.111"),
1191 ("11655.66", "0.077"),
1192 ("11655.57", "0.34"),
1193 ("11655.48", "0.4"),
1194 ("11655.26", "1.185"),
1195 ("11654.86", "0.195"),
1196 ];
1197
1198 for (i, (price, size)) in expected_bids.iter().enumerate() {
1199 assert_eq!(first.bids[i].price, Price::from(*price));
1200 assert_eq!(first.bids[i].size, Quantity::from(*size));
1201 assert_eq!(first.bids[i].side, OrderSide::Buy);
1202 }
1203
1204 let expected_asks = vec![
1206 ("11657.08", "1.714"),
1207 ("11657.54", "5.4"),
1208 ("11657.56", "0.238"),
1209 ("11657.61", "0.077"),
1210 ("11657.92", "0.918"),
1211 ("11658.09", "1.015"),
1212 ("11658.12", "0.665"),
1213 ("11658.19", "0.583"),
1214 ("11658.28", "0.255"),
1215 ("11658.29", "0.656"),
1216 ];
1217
1218 for (i, (price, size)) in expected_asks.iter().enumerate() {
1219 assert_eq!(first.asks[i].price, Price::from(*price));
1220 assert_eq!(first.asks[i].size, Quantity::from(*size));
1221 assert_eq!(first.asks[i].side, OrderSide::Sell);
1222 }
1223
1224 for i in 1..10 {
1226 assert!(
1227 first.bids[i].price < first.bids[i - 1].price,
1228 "Bid price at level {} ({}) should be less than level {} ({})",
1229 i,
1230 first.bids[i].price,
1231 i - 1,
1232 first.bids[i - 1].price
1233 );
1234 }
1235
1236 for i in 1..10 {
1238 assert!(
1239 first.asks[i].price > first.asks[i - 1].price,
1240 "Ask price at level {} ({}) should be greater than level {} ({})",
1241 i,
1242 first.asks[i].price,
1243 i - 1,
1244 first.asks[i - 1].price
1245 );
1246 }
1247
1248 assert!(
1250 first.asks[0].price > first.bids[0].price,
1251 "Best ask ({}) should be greater than best bid ({})",
1252 first.asks[0].price,
1253 first.bids[0].price
1254 );
1255
1256 for i in 0..10 {
1258 assert_eq!(first.bid_counts[i], 1);
1259 assert_eq!(first.ask_counts[i], 1);
1260 }
1261
1262 assert_eq!(first.flags, 128); assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1265 assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1266 assert_eq!(first.sequence, 0);
1267 }
1268
1269 #[rstest]
1270 fn test_snapshot_csv_field_order_interleaved() {
1271 let csv_data = "exchange,symbol,timestamp,local_timestamp,\
1275asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,\
1276asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,\
1277asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,\
1278asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,\
1279asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1280binance-futures,BTCUSDT,1000000,2000000,\
1281100.5,1.0,100.4,2.0,\
1282100.6,1.1,100.3,2.1,\
1283100.7,1.2,100.2,2.2,\
1284100.8,1.3,100.1,2.3,\
1285100.9,1.4,100.0,2.4";
1286
1287 let temp_file = std::env::temp_dir().join("test_interleaved_snapshot5.csv");
1288 std::fs::write(&temp_file, csv_data).unwrap();
1289
1290 let depths = load_depth10_from_snapshot5(&temp_file, None, None, None, Some(1)).unwrap();
1291 assert_eq!(depths.len(), 1);
1292
1293 let depth = &depths[0];
1294
1295 assert_eq!(depth.bids[0].price, Price::from("100.4"));
1297 assert_eq!(depth.bids[1].price, Price::from("100.3"));
1298 assert_eq!(depth.bids[2].price, Price::from("100.2"));
1299 assert_eq!(depth.bids[3].price, Price::from("100.1"));
1300 assert_eq!(depth.bids[4].price, Price::from("100.0"));
1301
1302 assert_eq!(depth.asks[0].price, Price::from("100.5"));
1304 assert_eq!(depth.asks[1].price, Price::from("100.6"));
1305 assert_eq!(depth.asks[2].price, Price::from("100.7"));
1306 assert_eq!(depth.asks[3].price, Price::from("100.8"));
1307 assert_eq!(depth.asks[4].price, Price::from("100.9"));
1308
1309 assert_eq!(depth.bids[0].size, Quantity::from("2.0"));
1311 assert_eq!(depth.asks[0].size, Quantity::from("1.0"));
1312
1313 std::fs::remove_file(temp_file).unwrap();
1314 }
1315}