1mod record;
17
18use std::{
19 error::Error,
20 ffi::OsStr,
21 fs::File,
22 io::{BufReader, Read, Seek, SeekFrom},
23 path::Path,
24 time::Duration,
25};
26
27use csv::{Reader, ReaderBuilder, StringRecord};
28use flate2::read::GzDecoder;
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31 data::{
32 BookOrder, DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
33 },
34 enums::{BookAction, OrderSide, RecordFlag},
35 identifiers::{InstrumentId, TradeId},
36 types::{Quantity, fixed::FIXED_PRECISION},
37};
38
39use super::{
40 csv::record::{
41 TardisBookUpdateRecord, TardisOrderBookSnapshot5Record, TardisOrderBookSnapshot25Record,
42 TardisQuoteRecord, TardisTradeRecord,
43 },
44 parse::{
45 parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
46 parse_timestamp,
47 },
48};
49use crate::parse::parse_price;
50
51fn infer_precision(value: f64) -> u8 {
52 let str_value = value.to_string(); match str_value.find('.') {
54 Some(decimal_idx) => (str_value.len() - decimal_idx - 1) as u8,
55 None => 0,
56 }
57}
58
59fn create_csv_reader<P: AsRef<Path>>(
60 filepath: P,
61) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
62 let filepath_ref = filepath.as_ref();
63 const MAX_RETRIES: u8 = 3;
64 const DELAY_MS: u64 = 100;
65
66 fn open_file_with_retry<P: AsRef<Path>>(
67 path: P,
68 max_retries: u8,
69 delay_ms: u64,
70 ) -> anyhow::Result<File> {
71 let path_ref = path.as_ref();
72 for attempt in 1..=max_retries {
73 match File::open(path_ref) {
74 Ok(file) => return Ok(file),
75 Err(e) => {
76 if attempt == max_retries {
77 anyhow::bail!(
78 "Failed to open file '{}' after {max_retries} attempts: {e}",
79 path_ref.display(),
80 );
81 }
82 eprintln!(
83 "Attempt {attempt}/{max_retries} failed to open file '{}': {e}. Retrying after {delay_ms}ms...",
84 path_ref.display(),
85 );
86 std::thread::sleep(Duration::from_millis(delay_ms));
87 }
88 }
89 }
90 unreachable!("Loop should return either Ok or Err");
91 }
92
93 let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
94
95 let is_gzipped = filepath_ref
96 .extension()
97 .and_then(OsStr::to_str)
98 .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
99
100 if !is_gzipped {
101 let buf_reader = BufReader::new(file);
102 return Ok(ReaderBuilder::new()
103 .has_headers(true)
104 .from_reader(Box::new(buf_reader)));
105 }
106
107 let file_size = file.metadata()?.len();
108 if file_size < 2 {
109 anyhow::bail!("File too small to be a valid gzip file");
110 }
111
112 let mut header_buf = [0u8; 2];
113 for attempt in 1..=MAX_RETRIES {
114 match file.read_exact(&mut header_buf) {
115 Ok(()) => break,
116 Err(e) => {
117 if attempt == MAX_RETRIES {
118 anyhow::bail!(
119 "Failed to read gzip header from '{}' after {MAX_RETRIES} attempts: {e}",
120 filepath_ref.display(),
121 );
122 }
123 eprintln!(
124 "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{}': {e}. Retrying after {DELAY_MS}ms...",
125 filepath_ref.display(),
126 );
127 std::thread::sleep(Duration::from_millis(DELAY_MS));
128 }
129 }
130 }
131
132 if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
133 anyhow::bail!(
134 "File '{}' has .gz extension but invalid gzip header",
135 filepath_ref.display(),
136 );
137 }
138
139 for attempt in 1..=MAX_RETRIES {
140 match file.seek(SeekFrom::Start(0)) {
141 Ok(_) => break,
142 Err(e) => {
143 if attempt == MAX_RETRIES {
144 anyhow::bail!(
145 "Failed to reset file position for '{}' after {MAX_RETRIES} attempts: {e}",
146 filepath_ref.display(),
147 );
148 }
149 eprintln!(
150 "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{}': {e}. Retrying after {DELAY_MS}ms...",
151 filepath_ref.display(),
152 );
153 std::thread::sleep(Duration::from_millis(DELAY_MS));
154 }
155 }
156 }
157
158 let buf_reader = BufReader::new(file);
159 let decoder = GzDecoder::new(buf_reader);
160
161 Ok(ReaderBuilder::new()
162 .has_headers(true)
163 .from_reader(Box::new(decoder)))
164}
165
166pub fn load_deltas<P: AsRef<Path>>(
169 filepath: P,
170 price_precision: Option<u8>,
171 size_precision: Option<u8>,
172 instrument_id: Option<InstrumentId>,
173 limit: Option<usize>,
174) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
175 let (price_precision, size_precision) = match (price_precision, size_precision) {
177 (Some(p), Some(s)) => (p, s),
178 (price_precision, size_precision) => {
179 let mut reader = create_csv_reader(&filepath)?;
180 let mut record = StringRecord::new();
181
182 let mut max_price_precision = 0u8;
183 let mut max_size_precision = 0u8;
184 let mut count = 0;
185
186 while reader.read_record(&mut record)? {
187 let parsed: TardisBookUpdateRecord = record.deserialize(None)?;
188
189 if price_precision.is_none() {
190 max_price_precision = infer_precision(parsed.price).max(max_price_precision);
191 }
192
193 if size_precision.is_none() {
194 max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
195 }
196
197 if let Some(limit) = limit {
198 if count >= limit {
199 break;
200 }
201 count += 1;
202 }
203 }
204
205 drop(reader);
206
207 max_price_precision = max_price_precision.min(FIXED_PRECISION);
208 max_size_precision = max_size_precision.min(FIXED_PRECISION);
209
210 (
211 price_precision.unwrap_or(max_price_precision),
212 size_precision.unwrap_or(max_size_precision),
213 )
214 }
215 };
216
217 let mut deltas: Vec<OrderBookDelta> = Vec::new();
218 let mut last_ts_event = UnixNanos::default();
219
220 let mut reader = create_csv_reader(filepath)?;
221 let mut record = StringRecord::new();
222
223 while reader.read_record(&mut record)? {
224 let record: TardisBookUpdateRecord = record.deserialize(None)?;
225
226 let instrument_id = match &instrument_id {
227 Some(id) => *id,
228 None => parse_instrument_id(&record.exchange, record.symbol),
229 };
230 let side = parse_order_side(&record.side);
231 let price = parse_price(record.price, price_precision);
232 let size = Quantity::new(record.amount, size_precision);
233 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
235
236 let action = parse_book_action(record.is_snapshot, size.as_f64());
237 let flags = 0; let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
240 let ts_init = parse_timestamp(record.local_timestamp);
241
242 if last_ts_event != ts_event {
244 if let Some(last_delta) = deltas.last_mut() {
245 last_delta.flags = RecordFlag::F_LAST.value();
247 }
248 }
249
250 assert!(
251 !(action != BookAction::Delete && size.is_zero()),
252 "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {record:?}"
253 );
254
255 last_ts_event = ts_event;
256
257 let delta = OrderBookDelta::new(
258 instrument_id,
259 action,
260 order,
261 flags,
262 sequence,
263 ts_event,
264 ts_init,
265 );
266
267 deltas.push(delta);
268
269 if let Some(limit) = limit {
270 if deltas.len() >= limit {
271 break;
272 }
273 }
274 }
275
276 if let Some(last_delta) = deltas.last_mut() {
278 last_delta.flags = RecordFlag::F_LAST.value();
279 }
280
281 Ok(deltas)
282}
283
284fn create_book_order(
285 side: OrderSide,
286 price: Option<f64>,
287 amount: Option<f64>,
288 price_precision: u8,
289 size_precision: u8,
290) -> (BookOrder, u32) {
291 match price {
292 Some(price) => (
293 BookOrder::new(
294 side,
295 parse_price(price, price_precision),
296 Quantity::new(amount.unwrap_or(0.0), size_precision),
297 0,
298 ),
299 1, ),
301 None => (NULL_ORDER, 0), }
303}
304
305pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
308 filepath: P,
309 price_precision: Option<u8>,
310 size_precision: Option<u8>,
311 instrument_id: Option<InstrumentId>,
312 limit: Option<usize>,
313) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
314 let (price_precision, size_precision) = match (price_precision, size_precision) {
316 (Some(p), Some(s)) => (p, s),
317 (price_precision, size_precision) => {
318 let mut reader = create_csv_reader(&filepath)?;
319 let mut record = StringRecord::new();
320
321 let mut max_price_precision = 0u8;
322 let mut max_size_precision = 0u8;
323 let mut count = 0;
324
325 while reader.read_record(&mut record)? {
326 let parsed: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
327
328 if price_precision.is_none() {
329 if let Some(bid_price) = parsed.bids_0_price {
330 max_price_precision = infer_precision(bid_price).max(max_price_precision);
331 }
332 }
333
334 if size_precision.is_none() {
335 if let Some(bid_amount) = parsed.bids_0_amount {
336 max_size_precision = infer_precision(bid_amount).max(max_size_precision);
337 }
338 }
339
340 if let Some(limit) = limit {
341 if count >= limit {
342 break;
343 }
344 count += 1;
345 }
346 }
347
348 drop(reader);
349
350 max_price_precision = max_price_precision.min(FIXED_PRECISION);
351 max_size_precision = max_size_precision.min(FIXED_PRECISION);
352
353 (
354 price_precision.unwrap_or(max_price_precision),
355 size_precision.unwrap_or(max_size_precision),
356 )
357 }
358 };
359
360 let mut depths: Vec<OrderBookDepth10> = Vec::new();
361
362 let mut reader = create_csv_reader(filepath)?;
363 let mut record = StringRecord::new();
364 while reader.read_record(&mut record)? {
365 let record: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
366 let instrument_id = match &instrument_id {
367 Some(id) => *id,
368 None => parse_instrument_id(&record.exchange, record.symbol),
369 };
370 let flags = RecordFlag::F_LAST.value();
371 let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
373 let ts_init = parse_timestamp(record.local_timestamp);
374
375 let mut bids = [NULL_ORDER; DEPTH10_LEN];
377 let mut asks = [NULL_ORDER; DEPTH10_LEN];
378 let mut bid_counts = [0u32; DEPTH10_LEN];
379 let mut ask_counts = [0u32; DEPTH10_LEN];
380
381 for i in 0..=4 {
382 let (bid_order, bid_count) = create_book_order(
384 OrderSide::Buy,
385 match i {
386 0 => record.bids_0_price,
387 1 => record.bids_1_price,
388 2 => record.bids_2_price,
389 3 => record.bids_3_price,
390 4 => record.bids_4_price,
391 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
392 },
393 match i {
394 0 => record.bids_0_amount,
395 1 => record.bids_1_amount,
396 2 => record.bids_2_amount,
397 3 => record.bids_3_amount,
398 4 => record.bids_4_amount,
399 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
400 },
401 price_precision,
402 size_precision,
403 );
404 bids[i] = bid_order;
405 bid_counts[i] = bid_count;
406
407 let (ask_order, ask_count) = create_book_order(
409 OrderSide::Sell,
410 match i {
411 0 => record.asks_0_price,
412 1 => record.asks_1_price,
413 2 => record.asks_2_price,
414 3 => record.asks_3_price,
415 4 => record.asks_4_price,
416 _ => None, },
418 match i {
419 0 => record.asks_0_amount,
420 1 => record.asks_1_amount,
421 2 => record.asks_2_amount,
422 3 => record.asks_3_amount,
423 4 => record.asks_4_amount,
424 _ => None, },
426 price_precision,
427 size_precision,
428 );
429 asks[i] = ask_order;
430 ask_counts[i] = ask_count;
431 }
432
433 let depth = OrderBookDepth10::new(
434 instrument_id,
435 bids,
436 asks,
437 bid_counts,
438 ask_counts,
439 flags,
440 sequence,
441 ts_event,
442 ts_init,
443 );
444
445 depths.push(depth);
446
447 if let Some(limit) = limit {
448 if depths.len() >= limit {
449 break;
450 }
451 }
452 }
453
454 Ok(depths)
455}
456
457pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
460 filepath: P,
461 price_precision: Option<u8>,
462 size_precision: Option<u8>,
463 instrument_id: Option<InstrumentId>,
464 limit: Option<usize>,
465) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
466 let (price_precision, size_precision) = match (price_precision, size_precision) {
468 (Some(p), Some(s)) => (p, s),
469 (price_precision, size_precision) => {
470 let mut reader = create_csv_reader(&filepath)?;
471 let mut record = StringRecord::new();
472
473 let mut max_price_precision = 0u8;
474 let mut max_size_precision = 0u8;
475 let mut count = 0;
476
477 while reader.read_record(&mut record)? {
478 let parsed: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
479
480 if price_precision.is_none() {
481 if let Some(bid_price) = parsed.bids_0_price {
482 max_price_precision = infer_precision(bid_price).max(max_price_precision);
483 }
484 }
485
486 if size_precision.is_none() {
487 if let Some(bid_amount) = parsed.bids_0_amount {
488 max_size_precision = infer_precision(bid_amount).max(max_size_precision);
489 }
490 }
491
492 if let Some(limit) = limit {
493 if count >= limit {
494 break;
495 }
496 count += 1;
497 }
498 }
499
500 drop(reader);
501
502 max_price_precision = max_price_precision.min(FIXED_PRECISION);
503 max_size_precision = max_size_precision.min(FIXED_PRECISION);
504
505 (
506 price_precision.unwrap_or(max_price_precision),
507 size_precision.unwrap_or(max_size_precision),
508 )
509 }
510 };
511
512 let mut depths: Vec<OrderBookDepth10> = Vec::new();
513 let mut reader = create_csv_reader(filepath)?;
514 let mut record = StringRecord::new();
515
516 while reader.read_record(&mut record)? {
517 let record: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
518
519 let instrument_id = match &instrument_id {
520 Some(id) => *id,
521 None => parse_instrument_id(&record.exchange, record.symbol),
522 };
523 let flags = RecordFlag::F_LAST.value();
524 let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
526 let ts_init = parse_timestamp(record.local_timestamp);
527
528 let mut bids = [NULL_ORDER; DEPTH10_LEN];
530 let mut asks = [NULL_ORDER; DEPTH10_LEN];
531 let mut bid_counts = [0u32; DEPTH10_LEN];
532 let mut ask_counts = [0u32; DEPTH10_LEN];
533
534 for i in 0..DEPTH10_LEN {
536 let (bid_order, bid_count) = create_book_order(
538 OrderSide::Buy,
539 match i {
540 0 => record.bids_0_price,
541 1 => record.bids_1_price,
542 2 => record.bids_2_price,
543 3 => record.bids_3_price,
544 4 => record.bids_4_price,
545 5 => record.bids_5_price,
546 6 => record.bids_6_price,
547 7 => record.bids_7_price,
548 8 => record.bids_8_price,
549 9 => record.bids_9_price,
550 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
551 },
552 match i {
553 0 => record.bids_0_amount,
554 1 => record.bids_1_amount,
555 2 => record.bids_2_amount,
556 3 => record.bids_3_amount,
557 4 => record.bids_4_amount,
558 5 => record.bids_5_amount,
559 6 => record.bids_6_amount,
560 7 => record.bids_7_amount,
561 8 => record.bids_8_amount,
562 9 => record.bids_9_amount,
563 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
564 },
565 price_precision,
566 size_precision,
567 );
568 bids[i] = bid_order;
569 bid_counts[i] = bid_count;
570
571 let (ask_order, ask_count) = create_book_order(
573 OrderSide::Sell,
574 match i {
575 0 => record.asks_0_price,
576 1 => record.asks_1_price,
577 2 => record.asks_2_price,
578 3 => record.asks_3_price,
579 4 => record.asks_4_price,
580 5 => record.asks_5_price,
581 6 => record.asks_6_price,
582 7 => record.asks_7_price,
583 8 => record.asks_8_price,
584 9 => record.asks_9_price,
585 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
586 },
587 match i {
588 0 => record.asks_0_amount,
589 1 => record.asks_1_amount,
590 2 => record.asks_2_amount,
591 3 => record.asks_3_amount,
592 4 => record.asks_4_amount,
593 5 => record.asks_5_amount,
594 6 => record.asks_6_amount,
595 7 => record.asks_7_amount,
596 8 => record.asks_8_amount,
597 9 => record.asks_9_amount,
598 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
599 },
600 price_precision,
601 size_precision,
602 );
603 asks[i] = ask_order;
604 ask_counts[i] = ask_count;
605 }
606
607 let depth = OrderBookDepth10::new(
608 instrument_id,
609 bids,
610 asks,
611 bid_counts,
612 ask_counts,
613 flags,
614 sequence,
615 ts_event,
616 ts_init,
617 );
618
619 depths.push(depth);
620
621 if let Some(limit) = limit {
622 if depths.len() >= limit {
623 break;
624 }
625 }
626 }
627
628 Ok(depths)
629}
630
631pub fn load_quote_ticks<P: AsRef<Path>>(
634 filepath: P,
635 price_precision: Option<u8>,
636 size_precision: Option<u8>,
637 instrument_id: Option<InstrumentId>,
638 limit: Option<usize>,
639) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
640 let (price_precision, size_precision) = match (price_precision, size_precision) {
642 (Some(p), Some(s)) => (p, s),
643 (price_precision, size_precision) => {
644 let mut reader = create_csv_reader(&filepath)?;
645 let mut record = StringRecord::new();
646
647 let mut max_price_precision = 0u8;
648 let mut max_size_precision = 0u8;
649 let mut count = 0;
650
651 while reader.read_record(&mut record)? {
652 let parsed: TardisQuoteRecord = record.deserialize(None)?;
653
654 if price_precision.is_none() {
655 if let Some(bid_price) = parsed.bid_price {
656 max_price_precision = infer_precision(bid_price).max(max_price_precision);
657 }
658 }
659
660 if size_precision.is_none() {
661 if let Some(bid_amount) = parsed.bid_amount {
662 max_size_precision = infer_precision(bid_amount).max(max_size_precision);
663 }
664 }
665
666 if let Some(limit) = limit {
667 if count >= limit {
668 break;
669 }
670 count += 1;
671 }
672 }
673
674 drop(reader);
675
676 max_price_precision = max_price_precision.min(FIXED_PRECISION);
677 max_size_precision = max_size_precision.min(FIXED_PRECISION);
678
679 (
680 price_precision.unwrap_or(max_price_precision),
681 size_precision.unwrap_or(max_size_precision),
682 )
683 }
684 };
685
686 let mut quotes = Vec::new();
687 let mut reader = create_csv_reader(filepath)?;
688 let mut record = StringRecord::new();
689
690 while reader.read_record(&mut record)? {
691 let record: TardisQuoteRecord = record.deserialize(None)?;
692
693 let instrument_id = match &instrument_id {
694 Some(id) => *id,
695 None => parse_instrument_id(&record.exchange, record.symbol),
696 };
697 let bid_price = parse_price(record.bid_price.unwrap_or(0.0), price_precision);
698 let bid_size = Quantity::new(record.bid_amount.unwrap_or(0.0), size_precision);
699 let ask_price = parse_price(record.ask_price.unwrap_or(0.0), price_precision);
700 let ask_size = Quantity::new(record.ask_amount.unwrap_or(0.0), size_precision);
701 let ts_event = parse_timestamp(record.timestamp);
702 let ts_init = parse_timestamp(record.local_timestamp);
703
704 let quote = QuoteTick::new(
705 instrument_id,
706 bid_price,
707 ask_price,
708 bid_size,
709 ask_size,
710 ts_event,
711 ts_init,
712 );
713
714 quotes.push(quote);
715
716 if let Some(limit) = limit {
717 if quotes.len() >= limit {
718 break;
719 }
720 }
721 }
722
723 Ok(quotes)
724}
725
726pub fn load_trade_ticks<P: AsRef<Path>>(
729 filepath: P,
730 price_precision: Option<u8>,
731 size_precision: Option<u8>,
732 instrument_id: Option<InstrumentId>,
733 limit: Option<usize>,
734) -> Result<Vec<TradeTick>, Box<dyn Error>> {
735 let (price_precision, size_precision) = match (price_precision, size_precision) {
737 (Some(p), Some(s)) => (p, s),
738 (price_precision, size_precision) => {
739 let mut reader = create_csv_reader(&filepath)?;
740 let mut record = StringRecord::new();
741
742 let mut max_price_precision = 0u8;
743 let mut max_size_precision = 0u8;
744 let mut count = 0;
745
746 while reader.read_record(&mut record)? {
747 let parsed: TardisTradeRecord = record.deserialize(None)?;
748
749 if price_precision.is_none() {
750 max_price_precision = infer_precision(parsed.price).max(max_price_precision);
751 }
752
753 if size_precision.is_none() {
754 max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
755 }
756
757 if let Some(limit) = limit {
758 if count >= limit {
759 break;
760 }
761 count += 1;
762 }
763 }
764
765 drop(reader);
766
767 max_price_precision = max_price_precision.min(FIXED_PRECISION);
768 max_size_precision = max_size_precision.min(FIXED_PRECISION);
769
770 (
771 price_precision.unwrap_or(max_price_precision),
772 size_precision.unwrap_or(max_size_precision),
773 )
774 }
775 };
776
777 let mut trades = Vec::new();
778 let mut reader = create_csv_reader(filepath)?;
779 let mut record = StringRecord::new();
780
781 while reader.read_record(&mut record)? {
782 let record: TardisTradeRecord = record.deserialize(None)?;
783
784 let instrument_id = match &instrument_id {
785 Some(id) => *id,
786 None => parse_instrument_id(&record.exchange, record.symbol),
787 };
788 let price = parse_price(record.price, price_precision);
789 let size = Quantity::new(record.amount, size_precision);
790 let aggressor_side = parse_aggressor_side(&record.side);
791 let trade_id = TradeId::new(&record.id);
792 let ts_event = parse_timestamp(record.timestamp);
793 let ts_init = parse_timestamp(record.local_timestamp);
794
795 let trade = TradeTick::new(
796 instrument_id,
797 price,
798 size,
799 aggressor_side,
800 trade_id,
801 ts_event,
802 ts_init,
803 );
804
805 trades.push(trade);
806
807 if let Some(limit) = limit {
808 if trades.len() >= limit {
809 break;
810 }
811 }
812 }
813
814 Ok(trades)
815}
816
817#[cfg(test)]
821mod tests {
822 use nautilus_model::{
823 enums::{AggressorSide, BookAction},
824 identifiers::InstrumentId,
825 types::Price,
826 };
827 use nautilus_test_kit::common::{
828 ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
829 ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
830 ensure_data_exists_tardis_huobi_quotes,
831 };
832 use rstest::*;
833
834 use super::*;
835
836 #[rstest]
837 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_deltas(
840 #[case] price_precision: Option<u8>,
841 #[case] size_precision: Option<u8>,
842 ) {
843 let filepath = ensure_data_exists_tardis_deribit_book_l2();
844 let deltas = load_deltas(
845 filepath,
846 price_precision,
847 size_precision,
848 None,
849 Some(10_000),
850 )
851 .unwrap();
852
853 assert_eq!(deltas.len(), 10_000);
854 assert_eq!(
855 deltas[0].instrument_id,
856 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
857 );
858 assert_eq!(deltas[0].action, BookAction::Add);
859 assert_eq!(deltas[0].order.side, OrderSide::Sell);
860 assert_eq!(deltas[0].order.price, Price::from("6421.5"));
861 assert_eq!(deltas[0].order.size, Quantity::from("18640"));
862 assert_eq!(deltas[0].flags, 0);
863 assert_eq!(deltas[0].sequence, 0);
864 assert_eq!(deltas[0].ts_event, 1585699200245000000);
865 assert_eq!(deltas[0].ts_init, 1585699200355684000);
866 }
867
868 #[rstest]
869 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot5(
872 #[case] price_precision: Option<u8>,
873 #[case] size_precision: Option<u8>,
874 ) {
875 let filepath = ensure_data_exists_tardis_binance_snapshot5();
876 let depths = load_depth10_from_snapshot5(
877 filepath,
878 price_precision,
879 size_precision,
880 None,
881 Some(10_000),
882 )
883 .unwrap();
884
885 assert_eq!(depths.len(), 10_000);
886 assert_eq!(
887 depths[0].instrument_id,
888 InstrumentId::from("BTCUSDT.BINANCE")
889 );
890 assert_eq!(depths[0].bids.len(), 10);
891 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
892 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
893 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
894 assert_eq!(depths[0].bids[0].order_id, 0);
895 assert_eq!(depths[0].asks.len(), 10);
896 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
897 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
898 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
899 assert_eq!(depths[0].asks[0].order_id, 0);
900 assert_eq!(depths[0].bid_counts[0], 1);
901 assert_eq!(depths[0].ask_counts[0], 1);
902 assert_eq!(depths[0].flags, 128);
903 assert_eq!(depths[0].ts_event, 1598918403696000000);
904 assert_eq!(depths[0].ts_init, 1598918403810979000);
905 assert_eq!(depths[0].sequence, 0);
906 }
907
908 #[rstest]
909 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot25(
912 #[case] price_precision: Option<u8>,
913 #[case] size_precision: Option<u8>,
914 ) {
915 let filepath = ensure_data_exists_tardis_binance_snapshot25();
916 let depths = load_depth10_from_snapshot25(
917 filepath,
918 price_precision,
919 size_precision,
920 None,
921 Some(10_000),
922 )
923 .unwrap();
924
925 assert_eq!(depths.len(), 10_000);
926 assert_eq!(
927 depths[0].instrument_id,
928 InstrumentId::from("BTCUSDT.BINANCE")
929 );
930 assert_eq!(depths[0].bids.len(), 10);
931 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
932 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
933 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
934 assert_eq!(depths[0].bids[0].order_id, 0);
935 assert_eq!(depths[0].asks.len(), 10);
936 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
937 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
938 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
939 assert_eq!(depths[0].asks[0].order_id, 0);
940 assert_eq!(depths[0].bid_counts[0], 1);
941 assert_eq!(depths[0].ask_counts[0], 1);
942 assert_eq!(depths[0].flags, 128);
943 assert_eq!(depths[0].ts_event, 1598918403696000000);
944 assert_eq!(depths[0].ts_init, 1598918403810979000);
945 assert_eq!(depths[0].sequence, 0);
946 }
947
948 #[rstest]
949 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_quotes(
952 #[case] price_precision: Option<u8>,
953 #[case] size_precision: Option<u8>,
954 ) {
955 let filepath = ensure_data_exists_tardis_huobi_quotes();
956 let quotes = load_quote_ticks(
957 filepath,
958 price_precision,
959 size_precision,
960 None,
961 Some(10_000),
962 )
963 .unwrap();
964
965 assert_eq!(quotes.len(), 10_000);
966 assert_eq!(quotes[0].instrument_id, InstrumentId::from("BTC-USD.HUOBI"));
967 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
968 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
969 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
970 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
971 assert_eq!(quotes[0].ts_event, 1588291201099000000);
972 assert_eq!(quotes[0].ts_init, 1588291201234268000);
973 }
974
975 #[rstest]
976 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_trades(
979 #[case] price_precision: Option<u8>,
980 #[case] size_precision: Option<u8>,
981 ) {
982 let filepath = ensure_data_exists_tardis_bitmex_trades();
983 let trades = load_trade_ticks(
984 filepath,
985 price_precision,
986 size_precision,
987 None,
988 Some(10_000),
989 )
990 .unwrap();
991
992 assert_eq!(trades.len(), 10_000);
993 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
994 assert_eq!(trades[0].price, Price::from("8531.5"));
995 assert_eq!(trades[0].size, Quantity::from("2152"));
996 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
997 assert_eq!(
998 trades[0].trade_id,
999 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1000 );
1001 assert_eq!(trades[0].ts_event, 1583020803145000000);
1002 assert_eq!(trades[0].ts_init, 1583020803307160000);
1003 }
1004}