1use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{
22 DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick,
24 },
25 enums::{OrderSide, RecordFlag},
26 identifiers::InstrumentId,
27 types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31 csv::{
32 create_book_order, create_csv_reader, infer_precision, parse_delta_record,
33 parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
34 record::{
35 TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
36 TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
37 },
38 },
39 parse::{parse_instrument_id, parse_timestamp},
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43 if explicit.is_some() {
44 return false;
45 }
46
47 let inferred = infer_precision(value).min(FIXED_PRECISION);
48 if inferred > *current {
49 *current = inferred;
50 true
51 } else {
52 false
53 }
54}
55
56fn update_deltas_precision(
57 deltas: &mut [OrderBookDelta],
58 price_precision: Option<u8>,
59 size_precision: Option<u8>,
60 current_price_precision: u8,
61 current_size_precision: u8,
62) {
63 for delta in deltas {
64 if price_precision.is_none() {
65 delta.order.price.precision = current_price_precision;
66 }
67 if size_precision.is_none() {
68 delta.order.size.precision = current_size_precision;
69 }
70 }
71}
72
73fn update_quotes_precision(
74 quotes: &mut [QuoteTick],
75 price_precision: Option<u8>,
76 size_precision: Option<u8>,
77 current_price_precision: u8,
78 current_size_precision: u8,
79) {
80 for quote in quotes {
81 if price_precision.is_none() {
82 quote.bid_price.precision = current_price_precision;
83 quote.ask_price.precision = current_price_precision;
84 }
85 if size_precision.is_none() {
86 quote.bid_size.precision = current_size_precision;
87 quote.ask_size.precision = current_size_precision;
88 }
89 }
90}
91
92fn update_trades_precision(
93 trades: &mut [TradeTick],
94 price_precision: Option<u8>,
95 size_precision: Option<u8>,
96 current_price_precision: u8,
97 current_size_precision: u8,
98) {
99 for trade in trades {
100 if price_precision.is_none() {
101 trade.price.precision = current_price_precision;
102 }
103 if size_precision.is_none() {
104 trade.size.precision = current_size_precision;
105 }
106 }
107}
108
109pub fn load_deltas<P: AsRef<Path>>(
121 filepath: P,
122 price_precision: Option<u8>,
123 size_precision: Option<u8>,
124 instrument_id: Option<InstrumentId>,
125 limit: Option<usize>,
126) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
127 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
129 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
130
131 let mut current_price_precision = price_precision.unwrap_or(0);
132 let mut current_size_precision = size_precision.unwrap_or(0);
133 let mut last_ts_event = UnixNanos::default();
134
135 let mut reader = create_csv_reader(filepath)?;
136 let mut record = StringRecord::new();
137
138 while reader.read_record(&mut record)? {
139 let data: TardisBookUpdateRecord = record.deserialize(None)?;
140
141 let price_updated =
143 update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
144 let size_updated =
145 update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
146
147 if price_updated || size_updated {
149 update_deltas_precision(
150 &mut deltas,
151 price_precision,
152 size_precision,
153 current_price_precision,
154 current_size_precision,
155 );
156 }
157
158 let delta = parse_delta_record(
159 &data,
160 current_price_precision,
161 current_size_precision,
162 instrument_id,
163 );
164
165 let ts_event = delta.ts_event;
167 if last_ts_event != ts_event
168 && let Some(last_delta) = deltas.last_mut()
169 {
170 last_delta.flags = RecordFlag::F_LAST.value();
172 }
173
174 last_ts_event = ts_event;
175
176 deltas.push(delta);
177
178 if let Some(limit) = limit
179 && deltas.len() >= limit
180 {
181 break;
182 }
183 }
184
185 if let Some(last_delta) = deltas.last_mut() {
187 last_delta.flags = RecordFlag::F_LAST.value();
188 }
189
190 Ok(deltas)
191}
192
193pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
205 filepath: P,
206 price_precision: Option<u8>,
207 size_precision: Option<u8>,
208 instrument_id: Option<InstrumentId>,
209 limit: Option<usize>,
210) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
211 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
213 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
214
215 let mut current_price_precision = price_precision.unwrap_or(0);
216 let mut current_size_precision = size_precision.unwrap_or(0);
217
218 let mut reader = create_csv_reader(filepath)?;
219 let mut record = StringRecord::new();
220
221 while reader.read_record(&mut record)? {
222 let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
223
224 let mut precision_updated = false;
226
227 if price_precision.is_none()
228 && let Some(bid_price) = data.bids_0_price
229 {
230 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
231 if inferred_price_precision > current_price_precision {
232 current_price_precision = inferred_price_precision;
233 precision_updated = true;
234 }
235 }
236
237 if size_precision.is_none()
238 && let Some(bid_amount) = data.bids_0_amount
239 {
240 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
241 if inferred_size_precision > current_size_precision {
242 current_size_precision = inferred_size_precision;
243 precision_updated = true;
244 }
245 }
246
247 if precision_updated {
249 for depth in &mut depths {
250 for i in 0..DEPTH10_LEN {
251 if price_precision.is_none() {
252 depth.bids[i].price.precision = current_price_precision;
253 depth.asks[i].price.precision = current_price_precision;
254 }
255 if size_precision.is_none() {
256 depth.bids[i].size.precision = current_size_precision;
257 depth.asks[i].size.precision = current_size_precision;
258 }
259 }
260 }
261 }
262
263 let instrument_id = match &instrument_id {
264 Some(id) => *id,
265 None => parse_instrument_id(&data.exchange, data.symbol),
266 };
267 let flags = RecordFlag::F_LAST.value();
268 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
270 let ts_init = parse_timestamp(data.local_timestamp);
271
272 let mut bids = [NULL_ORDER; DEPTH10_LEN];
274 let mut asks = [NULL_ORDER; DEPTH10_LEN];
275 let mut bid_counts = [0u32; DEPTH10_LEN];
276 let mut ask_counts = [0u32; DEPTH10_LEN];
277
278 for i in 0..=4 {
279 let (bid_order, bid_count) = create_book_order(
281 OrderSide::Buy,
282 match i {
283 0 => data.bids_0_price,
284 1 => data.bids_1_price,
285 2 => data.bids_2_price,
286 3 => data.bids_3_price,
287 4 => data.bids_4_price,
288 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
289 },
290 match i {
291 0 => data.bids_0_amount,
292 1 => data.bids_1_amount,
293 2 => data.bids_2_amount,
294 3 => data.bids_3_amount,
295 4 => data.bids_4_amount,
296 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
297 },
298 current_price_precision,
299 current_size_precision,
300 );
301 bids[i] = bid_order;
302 bid_counts[i] = bid_count;
303
304 let (ask_order, ask_count) = create_book_order(
306 OrderSide::Sell,
307 match i {
308 0 => data.asks_0_price,
309 1 => data.asks_1_price,
310 2 => data.asks_2_price,
311 3 => data.asks_3_price,
312 4 => data.asks_4_price,
313 _ => None, },
315 match i {
316 0 => data.asks_0_amount,
317 1 => data.asks_1_amount,
318 2 => data.asks_2_amount,
319 3 => data.asks_3_amount,
320 4 => data.asks_4_amount,
321 _ => None, },
323 current_price_precision,
324 current_size_precision,
325 );
326 asks[i] = ask_order;
327 ask_counts[i] = ask_count;
328 }
329
330 let depth = OrderBookDepth10::new(
331 instrument_id,
332 bids,
333 asks,
334 bid_counts,
335 ask_counts,
336 flags,
337 sequence,
338 ts_event,
339 ts_init,
340 );
341
342 depths.push(depth);
343
344 if let Some(limit) = limit
345 && depths.len() >= limit
346 {
347 break;
348 }
349 }
350
351 Ok(depths)
352}
353
354pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
366 filepath: P,
367 price_precision: Option<u8>,
368 size_precision: Option<u8>,
369 instrument_id: Option<InstrumentId>,
370 limit: Option<usize>,
371) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
372 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
374 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
375
376 let mut current_price_precision = price_precision.unwrap_or(0);
377 let mut current_size_precision = size_precision.unwrap_or(0);
378 let mut reader = create_csv_reader(filepath)?;
379 let mut record = StringRecord::new();
380
381 while reader.read_record(&mut record)? {
382 let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
383
384 let mut precision_updated = false;
386
387 if price_precision.is_none()
388 && let Some(bid_price) = data.bids_0_price
389 {
390 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
391 if inferred_price_precision > current_price_precision {
392 current_price_precision = inferred_price_precision;
393 precision_updated = true;
394 }
395 }
396
397 if size_precision.is_none()
398 && let Some(bid_amount) = data.bids_0_amount
399 {
400 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
401 if inferred_size_precision > current_size_precision {
402 current_size_precision = inferred_size_precision;
403 precision_updated = true;
404 }
405 }
406
407 if precision_updated {
409 for depth in &mut depths {
410 for i in 0..DEPTH10_LEN {
411 if price_precision.is_none() {
412 depth.bids[i].price.precision = current_price_precision;
413 depth.asks[i].price.precision = current_price_precision;
414 }
415 if size_precision.is_none() {
416 depth.bids[i].size.precision = current_size_precision;
417 depth.asks[i].size.precision = current_size_precision;
418 }
419 }
420 }
421 }
422
423 let instrument_id = match &instrument_id {
424 Some(id) => *id,
425 None => parse_instrument_id(&data.exchange, data.symbol),
426 };
427 let flags = RecordFlag::F_LAST.value();
428 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
430 let ts_init = parse_timestamp(data.local_timestamp);
431
432 let mut bids = [NULL_ORDER; DEPTH10_LEN];
434 let mut asks = [NULL_ORDER; DEPTH10_LEN];
435 let mut bid_counts = [0u32; DEPTH10_LEN];
436 let mut ask_counts = [0u32; DEPTH10_LEN];
437
438 for i in 0..DEPTH10_LEN {
440 let (bid_order, bid_count) = create_book_order(
442 OrderSide::Buy,
443 match i {
444 0 => data.bids_0_price,
445 1 => data.bids_1_price,
446 2 => data.bids_2_price,
447 3 => data.bids_3_price,
448 4 => data.bids_4_price,
449 5 => data.bids_5_price,
450 6 => data.bids_6_price,
451 7 => data.bids_7_price,
452 8 => data.bids_8_price,
453 9 => data.bids_9_price,
454 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
455 },
456 match i {
457 0 => data.bids_0_amount,
458 1 => data.bids_1_amount,
459 2 => data.bids_2_amount,
460 3 => data.bids_3_amount,
461 4 => data.bids_4_amount,
462 5 => data.bids_5_amount,
463 6 => data.bids_6_amount,
464 7 => data.bids_7_amount,
465 8 => data.bids_8_amount,
466 9 => data.bids_9_amount,
467 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
468 },
469 current_price_precision,
470 current_size_precision,
471 );
472 bids[i] = bid_order;
473 bid_counts[i] = bid_count;
474
475 let (ask_order, ask_count) = create_book_order(
477 OrderSide::Sell,
478 match i {
479 0 => data.asks_0_price,
480 1 => data.asks_1_price,
481 2 => data.asks_2_price,
482 3 => data.asks_3_price,
483 4 => data.asks_4_price,
484 5 => data.asks_5_price,
485 6 => data.asks_6_price,
486 7 => data.asks_7_price,
487 8 => data.asks_8_price,
488 9 => data.asks_9_price,
489 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
490 },
491 match i {
492 0 => data.asks_0_amount,
493 1 => data.asks_1_amount,
494 2 => data.asks_2_amount,
495 3 => data.asks_3_amount,
496 4 => data.asks_4_amount,
497 5 => data.asks_5_amount,
498 6 => data.asks_6_amount,
499 7 => data.asks_7_amount,
500 8 => data.asks_8_amount,
501 9 => data.asks_9_amount,
502 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
503 },
504 current_price_precision,
505 current_size_precision,
506 );
507 asks[i] = ask_order;
508 ask_counts[i] = ask_count;
509 }
510
511 let depth = OrderBookDepth10::new(
512 instrument_id,
513 bids,
514 asks,
515 bid_counts,
516 ask_counts,
517 flags,
518 sequence,
519 ts_event,
520 ts_init,
521 );
522
523 depths.push(depth);
524
525 if let Some(limit) = limit
526 && depths.len() >= limit
527 {
528 break;
529 }
530 }
531
532 Ok(depths)
533}
534
535pub fn load_quotes<P: AsRef<Path>>(
547 filepath: P,
548 price_precision: Option<u8>,
549 size_precision: Option<u8>,
550 instrument_id: Option<InstrumentId>,
551 limit: Option<usize>,
552) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
553 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
555 let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
556
557 let mut current_price_precision = price_precision.unwrap_or(0);
558 let mut current_size_precision = size_precision.unwrap_or(0);
559 let mut reader = create_csv_reader(filepath)?;
560 let mut record = StringRecord::new();
561
562 while reader.read_record(&mut record)? {
563 let data: TardisQuoteRecord = record.deserialize(None)?;
564
565 let mut precision_updated = false;
567
568 if price_precision.is_none()
569 && let Some(bid_price) = data.bid_price
570 {
571 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
572 if inferred_price_precision > current_price_precision {
573 current_price_precision = inferred_price_precision;
574 precision_updated = true;
575 }
576 }
577
578 if size_precision.is_none()
579 && let Some(bid_amount) = data.bid_amount
580 {
581 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
582 if inferred_size_precision > current_size_precision {
583 current_size_precision = inferred_size_precision;
584 precision_updated = true;
585 }
586 }
587
588 if precision_updated {
590 update_quotes_precision(
591 &mut quotes,
592 price_precision,
593 size_precision,
594 current_price_precision,
595 current_size_precision,
596 );
597 }
598
599 let quote = parse_quote_record(
600 &data,
601 current_price_precision,
602 current_size_precision,
603 instrument_id,
604 );
605
606 quotes.push(quote);
607
608 if let Some(limit) = limit
609 && quotes.len() >= limit
610 {
611 break;
612 }
613 }
614
615 Ok(quotes)
616}
617
618pub fn load_trades<P: AsRef<Path>>(
630 filepath: P,
631 price_precision: Option<u8>,
632 size_precision: Option<u8>,
633 instrument_id: Option<InstrumentId>,
634 limit: Option<usize>,
635) -> Result<Vec<TradeTick>, Box<dyn Error>> {
636 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
638 let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
639
640 let mut current_price_precision = price_precision.unwrap_or(0);
641 let mut current_size_precision = size_precision.unwrap_or(0);
642 let mut reader = create_csv_reader(filepath)?;
643 let mut record = StringRecord::new();
644
645 while reader.read_record(&mut record)? {
646 let data: TardisTradeRecord = record.deserialize(None)?;
647
648 let mut precision_updated = false;
650
651 if price_precision.is_none() {
652 let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
653 if inferred_price_precision > current_price_precision {
654 current_price_precision = inferred_price_precision;
655 precision_updated = true;
656 }
657 }
658
659 if size_precision.is_none() {
660 let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
661 if inferred_size_precision > current_size_precision {
662 current_size_precision = inferred_size_precision;
663 precision_updated = true;
664 }
665 }
666
667 if precision_updated {
669 update_trades_precision(
670 &mut trades,
671 price_precision,
672 size_precision,
673 current_price_precision,
674 current_size_precision,
675 );
676 }
677
678 let size = Quantity::new_checked(data.amount, current_size_precision)?;
679
680 if size.is_positive() {
681 let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
682
683 trades.push(trade);
684
685 if let Some(limit) = limit
686 && trades.len() >= limit
687 {
688 break;
689 }
690 } else {
691 log::warn!("Skipping zero-sized trade: {data:?}");
692 }
693 }
694
695 Ok(trades)
696}
697
698pub fn load_funding_rates<P: AsRef<Path>>(
708 filepath: P,
709 instrument_id: Option<InstrumentId>,
710 limit: Option<usize>,
711) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
712 let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
714 let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
715
716 let mut reader = create_csv_reader(filepath)?;
717 let mut record = StringRecord::new();
718
719 while reader.read_record(&mut record)? {
720 let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
721
722 if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
724 funding_rates.push(funding_rate);
725
726 if let Some(limit) = limit
727 && funding_rates.len() >= limit
728 {
729 break;
730 }
731 }
732 }
733
734 Ok(funding_rates)
735}
736
737#[cfg(test)]
741mod tests {
742 use nautilus_model::{
743 enums::{AggressorSide, BookAction},
744 identifiers::{InstrumentId, TradeId},
745 types::{Price, Quantity},
746 };
747 use nautilus_testkit::common::{
748 ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
749 ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
750 ensure_data_exists_tardis_huobi_quotes,
751 };
752 use rstest::*;
753
754 use super::*;
755 use crate::{parse::parse_price, tests::get_test_data_path};
756
757 #[rstest]
758 #[case(0.0, 0)]
759 #[case(42.0, 0)]
760 #[case(0.1, 1)]
761 #[case(0.25, 2)]
762 #[case(123.0001, 4)]
763 #[case(-42.987654321, 9)]
764 #[case(1.234_567_890_123, 12)]
765 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
766 assert_eq!(infer_precision(input), expected);
767 }
768
769 #[rstest]
770 pub fn test_dynamic_precision_inference() {
771 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
772binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
773binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
774binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
775binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
776binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
777
778 let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
779 std::fs::write(&temp_file, csv_data).unwrap();
780
781 let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
782
783 assert_eq!(deltas.len(), 5);
784
785 for (i, delta) in deltas.iter().enumerate() {
786 assert_eq!(
787 delta.order.price.precision, 4,
788 "Price precision should be 4 for delta {i}",
789 );
790 assert_eq!(
791 delta.order.size.precision, 1,
792 "Size precision should be 1 for delta {i}",
793 );
794 }
795
796 assert_eq!(deltas[0].order.price, parse_price(50000.0, 4));
798 assert_eq!(deltas[0].order.size, Quantity::new(1.0, 1));
799
800 assert_eq!(deltas[1].order.price, parse_price(49999.5, 4));
801 assert_eq!(deltas[1].order.size, Quantity::new(2.0, 1));
802
803 assert_eq!(deltas[2].order.price, parse_price(50000.12, 4));
804 assert_eq!(deltas[2].order.size, Quantity::new(1.5, 1));
805
806 assert_eq!(deltas[3].order.price, parse_price(49999.123, 4));
807 assert_eq!(deltas[3].order.size, Quantity::new(3.0, 1));
808
809 assert_eq!(deltas[4].order.price, parse_price(50000.1234, 4));
810 assert_eq!(deltas[4].order.size, Quantity::new(0.5, 1));
811
812 assert_eq!(
813 deltas[0].order.price.precision,
814 deltas[4].order.price.precision
815 );
816 assert_eq!(
817 deltas[0].order.size.precision,
818 deltas[2].order.size.precision
819 );
820
821 std::fs::remove_file(&temp_file).ok();
822 }
823
824 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
826 #[rstest]
827 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_deltas(
830 #[case] price_precision: Option<u8>,
831 #[case] size_precision: Option<u8>,
832 ) {
833 let filepath = ensure_data_exists_tardis_deribit_book_l2();
834 let deltas = load_deltas(
835 filepath,
836 price_precision,
837 size_precision,
838 None,
839 Some(10_000),
840 )
841 .unwrap();
842
843 assert_eq!(deltas.len(), 10_000);
844 assert_eq!(
845 deltas[0].instrument_id,
846 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
847 );
848 assert_eq!(deltas[0].action, BookAction::Add);
849 assert_eq!(deltas[0].order.side, OrderSide::Sell);
850 assert_eq!(deltas[0].order.price, Price::from("6421.5"));
851 assert_eq!(deltas[0].order.size, Quantity::from("18640"));
852 assert_eq!(deltas[0].flags, 0);
853 assert_eq!(deltas[0].sequence, 0);
854 assert_eq!(deltas[0].ts_event, 1585699200245000000);
855 assert_eq!(deltas[0].ts_init, 1585699200355684000);
856 }
857
858 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
860 #[rstest]
861 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot5(
864 #[case] price_precision: Option<u8>,
865 #[case] size_precision: Option<u8>,
866 ) {
867 let filepath = ensure_data_exists_tardis_binance_snapshot5();
868 let depths = load_depth10_from_snapshot5(
869 filepath,
870 price_precision,
871 size_precision,
872 None,
873 Some(10_000),
874 )
875 .unwrap();
876
877 assert_eq!(depths.len(), 10_000);
878 assert_eq!(
879 depths[0].instrument_id,
880 InstrumentId::from("BTCUSDT.BINANCE")
881 );
882 assert_eq!(depths[0].bids.len(), 10);
883 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
884 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
885 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
886 assert_eq!(depths[0].bids[0].order_id, 0);
887 assert_eq!(depths[0].asks.len(), 10);
888 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
889 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
890 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
891 assert_eq!(depths[0].asks[0].order_id, 0);
892 assert_eq!(depths[0].bid_counts[0], 1);
893 assert_eq!(depths[0].ask_counts[0], 1);
894 assert_eq!(depths[0].flags, 128);
895 assert_eq!(depths[0].ts_event, 1598918403696000000);
896 assert_eq!(depths[0].ts_init, 1598918403810979000);
897 assert_eq!(depths[0].sequence, 0);
898 }
899
900 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
902 #[rstest]
903 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot25(
906 #[case] price_precision: Option<u8>,
907 #[case] size_precision: Option<u8>,
908 ) {
909 let filepath = ensure_data_exists_tardis_binance_snapshot25();
910 let depths = load_depth10_from_snapshot25(
911 filepath,
912 price_precision,
913 size_precision,
914 None,
915 Some(10_000),
916 )
917 .unwrap();
918
919 assert_eq!(depths.len(), 10_000);
920 assert_eq!(
921 depths[0].instrument_id,
922 InstrumentId::from("BTCUSDT.BINANCE")
923 );
924 assert_eq!(depths[0].bids.len(), 10);
925 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
926 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
927 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
928 assert_eq!(depths[0].bids[0].order_id, 0);
929 assert_eq!(depths[0].asks.len(), 10);
930 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
931 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
932 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
933 assert_eq!(depths[0].asks[0].order_id, 0);
934 assert_eq!(depths[0].bid_counts[0], 1);
935 assert_eq!(depths[0].ask_counts[0], 1);
936 assert_eq!(depths[0].flags, 128);
937 assert_eq!(depths[0].ts_event, 1598918403696000000);
938 assert_eq!(depths[0].ts_init, 1598918403810979000);
939 assert_eq!(depths[0].sequence, 0);
940 }
941
942 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
944 #[rstest]
945 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_quotes(
948 #[case] price_precision: Option<u8>,
949 #[case] size_precision: Option<u8>,
950 ) {
951 let filepath = ensure_data_exists_tardis_huobi_quotes();
952 let quotes = load_quotes(
953 filepath,
954 price_precision,
955 size_precision,
956 None,
957 Some(10_000),
958 )
959 .unwrap();
960
961 assert_eq!(quotes.len(), 10_000);
962 assert_eq!(
963 quotes[0].instrument_id,
964 InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
965 );
966 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
967 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
968 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
969 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
970 assert_eq!(quotes[0].ts_event, 1588291201099000000);
971 assert_eq!(quotes[0].ts_init, 1588291201234268000);
972 }
973
974 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
976 #[rstest]
977 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_trades(
980 #[case] price_precision: Option<u8>,
981 #[case] size_precision: Option<u8>,
982 ) {
983 let filepath = ensure_data_exists_tardis_bitmex_trades();
984 let trades = load_trades(
985 filepath,
986 price_precision,
987 size_precision,
988 None,
989 Some(10_000),
990 )
991 .unwrap();
992
993 assert_eq!(trades.len(), 10_000);
994 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
995 assert_eq!(trades[0].price, Price::from("8531.5"));
996 assert_eq!(trades[0].size, Quantity::from("2152"));
997 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
998 assert_eq!(
999 trades[0].trade_id,
1000 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1001 );
1002 assert_eq!(trades[0].ts_event, 1583020803145000000);
1003 assert_eq!(trades[0].ts_init, 1583020803307160000);
1004 }
1005
1006 #[rstest]
1007 pub fn test_load_trades_with_zero_sized_trade() {
1008 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1010binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1011binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1012binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1013binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1014
1015 let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
1016 std::fs::write(&temp_file, csv_data).unwrap();
1017
1018 let trades = load_trades(
1019 &temp_file,
1020 Some(4),
1021 Some(1),
1022 None,
1023 None, )
1025 .unwrap();
1026
1027 assert_eq!(trades.len(), 3);
1029
1030 assert_eq!(trades[0].size, Quantity::from("1.0"));
1032 assert_eq!(trades[1].size, Quantity::from("1.5"));
1033 assert_eq!(trades[2].size, Quantity::from("3.0"));
1034
1035 assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1037 assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1038 assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1039
1040 std::fs::remove_file(&temp_file).ok();
1041 }
1042
1043 #[rstest]
1044 pub fn test_load_trades_from_local_file() {
1045 let filepath = get_test_data_path("csv/trades_1.csv");
1046 let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1047 assert_eq!(trades.len(), 2);
1048 assert_eq!(trades[0].price, Price::from("8531.5"));
1049 assert_eq!(trades[1].size, Quantity::from("1000"));
1050 }
1051
1052 #[rstest]
1053 pub fn test_load_deltas_from_local_file() {
1054 let filepath = get_test_data_path("csv/deltas_1.csv");
1055 let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1056 assert_eq!(deltas.len(), 2);
1057 assert_eq!(deltas[0].order.price, Price::from("6421.5"));
1058 assert_eq!(deltas[1].order.size, Quantity::from("10000"));
1059 }
1060}