1mod record;
17
18use std::{
19 error::Error,
20 ffi::OsStr,
21 fs::File,
22 io::{BufReader, Read, Seek, SeekFrom},
23 path::Path,
24 time::Duration,
25};
26
27use csv::{Reader, ReaderBuilder, StringRecord};
28use flate2::read::GzDecoder;
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31 data::{
32 BookOrder, DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
33 },
34 enums::{BookAction, OrderSide, RecordFlag},
35 identifiers::{InstrumentId, TradeId},
36 types::{Quantity, fixed::FIXED_PRECISION},
37};
38
39use super::{
40 csv::record::{
41 TardisBookUpdateRecord, TardisOrderBookSnapshot5Record, TardisOrderBookSnapshot25Record,
42 TardisQuoteRecord, TardisTradeRecord,
43 },
44 parse::{
45 parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
46 parse_timestamp,
47 },
48};
49use crate::parse::parse_price;
50
51fn infer_precision(value: f64) -> u8 {
52 let str_value = value.to_string(); match str_value.find('.') {
54 Some(decimal_idx) => (str_value.len() - decimal_idx - 1) as u8,
55 None => 0,
56 }
57}
58
59fn create_csv_reader<P: AsRef<Path>>(
60 filepath: P,
61) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
62 let filepath_ref = filepath.as_ref();
63 const MAX_RETRIES: u8 = 3;
64 const DELAY_MS: u64 = 100;
65
66 fn open_file_with_retry<P: AsRef<Path>>(
67 path: P,
68 max_retries: u8,
69 delay_ms: u64,
70 ) -> anyhow::Result<File> {
71 let path_ref = path.as_ref();
72 for attempt in 1..=max_retries {
73 match File::open(path_ref) {
74 Ok(file) => return Ok(file),
75 Err(e) => {
76 if attempt == max_retries {
77 anyhow::bail!(
78 "Failed to open file '{path_ref:?}' after {max_retries} attempts: {e}"
79 );
80 }
81 eprintln!(
82 "Attempt {attempt}/{max_retries} failed to open file '{path_ref:?}': {e}. Retrying after {delay_ms}ms..."
83 );
84 std::thread::sleep(Duration::from_millis(delay_ms));
85 }
86 }
87 }
88 unreachable!("Loop should return either Ok or Err");
89 }
90
91 let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
92
93 let is_gzipped = filepath_ref
94 .extension()
95 .and_then(OsStr::to_str)
96 .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
97
98 if !is_gzipped {
99 let buf_reader = BufReader::new(file);
100 return Ok(ReaderBuilder::new()
101 .has_headers(true)
102 .from_reader(Box::new(buf_reader)));
103 }
104
105 let file_size = file.metadata()?.len();
106 if file_size < 2 {
107 anyhow::bail!("File too small to be a valid gzip file");
108 }
109
110 let mut header_buf = [0u8; 2];
111 for attempt in 1..=MAX_RETRIES {
112 match file.read_exact(&mut header_buf) {
113 Ok(()) => break,
114 Err(e) => {
115 if attempt == MAX_RETRIES {
116 anyhow::bail!(
117 "Failed to read gzip header from '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
118 );
119 }
120 eprintln!(
121 "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
122 );
123 std::thread::sleep(Duration::from_millis(DELAY_MS));
124 }
125 }
126 }
127
128 if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
129 anyhow::bail!("File '{filepath_ref:?}' has .gz extension but invalid gzip header");
130 }
131
132 for attempt in 1..=MAX_RETRIES {
133 match file.seek(SeekFrom::Start(0)) {
134 Ok(_) => break,
135 Err(e) => {
136 if attempt == MAX_RETRIES {
137 anyhow::bail!(
138 "Failed to reset file position for '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
139 );
140 }
141 eprintln!(
142 "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
143 );
144 std::thread::sleep(Duration::from_millis(DELAY_MS));
145 }
146 }
147 }
148
149 let buf_reader = BufReader::new(file);
150 let decoder = GzDecoder::new(buf_reader);
151
152 Ok(ReaderBuilder::new()
153 .has_headers(true)
154 .from_reader(Box::new(decoder)))
155}
156
157pub fn load_deltas<P: AsRef<Path>>(
160 filepath: P,
161 price_precision: Option<u8>,
162 size_precision: Option<u8>,
163 instrument_id: Option<InstrumentId>,
164 limit: Option<usize>,
165) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
166 let (price_precision, size_precision) = match (price_precision, size_precision) {
168 (Some(p), Some(s)) => (p, s),
169 (price_precision, size_precision) => {
170 let mut reader = create_csv_reader(&filepath)?;
171 let mut record = StringRecord::new();
172
173 let mut max_price_precision = 0u8;
174 let mut max_size_precision = 0u8;
175 let mut count = 0;
176
177 while reader.read_record(&mut record)? {
178 let parsed: TardisBookUpdateRecord = record.deserialize(None)?;
179
180 if price_precision.is_none() {
181 max_price_precision = infer_precision(parsed.price).max(max_price_precision);
182 }
183
184 if size_precision.is_none() {
185 max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
186 }
187
188 if let Some(limit) = limit {
189 if count >= limit {
190 break;
191 }
192 count += 1;
193 }
194 }
195
196 drop(reader);
197
198 max_price_precision = max_price_precision.min(FIXED_PRECISION);
199 max_size_precision = max_size_precision.min(FIXED_PRECISION);
200
201 (
202 price_precision.unwrap_or(max_price_precision),
203 size_precision.unwrap_or(max_size_precision),
204 )
205 }
206 };
207
208 let mut deltas: Vec<OrderBookDelta> = Vec::new();
209 let mut last_ts_event = UnixNanos::default();
210
211 let mut reader = create_csv_reader(filepath)?;
212 let mut record = StringRecord::new();
213
214 while reader.read_record(&mut record)? {
215 let record: TardisBookUpdateRecord = record.deserialize(None)?;
216
217 let instrument_id = match &instrument_id {
218 Some(id) => *id,
219 None => parse_instrument_id(&record.exchange, record.symbol),
220 };
221 let side = parse_order_side(&record.side);
222 let price = parse_price(record.price, price_precision);
223 let size = Quantity::new(record.amount, size_precision);
224 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
226
227 let action = parse_book_action(record.is_snapshot, size.as_f64());
228 let flags = 0; let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
231 let ts_init = parse_timestamp(record.local_timestamp);
232
233 if last_ts_event != ts_event {
235 if let Some(last_delta) = deltas.last_mut() {
236 last_delta.flags = RecordFlag::F_LAST.value();
238 }
239 }
240
241 assert!(
242 !(action != BookAction::Delete && size.is_zero()),
243 "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {record:?}"
244 );
245
246 last_ts_event = ts_event;
247
248 let delta = OrderBookDelta::new(
249 instrument_id,
250 action,
251 order,
252 flags,
253 sequence,
254 ts_event,
255 ts_init,
256 );
257
258 deltas.push(delta);
259
260 if let Some(limit) = limit {
261 if deltas.len() >= limit {
262 break;
263 }
264 }
265 }
266
267 if let Some(last_delta) = deltas.last_mut() {
269 last_delta.flags = RecordFlag::F_LAST.value();
270 }
271
272 Ok(deltas)
273}
274
275fn create_book_order(
276 side: OrderSide,
277 price: Option<f64>,
278 amount: Option<f64>,
279 price_precision: u8,
280 size_precision: u8,
281) -> (BookOrder, u32) {
282 match price {
283 Some(price) => (
284 BookOrder::new(
285 side,
286 parse_price(price, price_precision),
287 Quantity::new(amount.unwrap_or(0.0), size_precision),
288 0,
289 ),
290 1, ),
292 None => (NULL_ORDER, 0), }
294}
295
296pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
299 filepath: P,
300 price_precision: Option<u8>,
301 size_precision: Option<u8>,
302 instrument_id: Option<InstrumentId>,
303 limit: Option<usize>,
304) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
305 let (price_precision, size_precision) = match (price_precision, size_precision) {
307 (Some(p), Some(s)) => (p, s),
308 (price_precision, size_precision) => {
309 let mut reader = create_csv_reader(&filepath)?;
310 let mut record = StringRecord::new();
311
312 let mut max_price_precision = 0u8;
313 let mut max_size_precision = 0u8;
314 let mut count = 0;
315
316 while reader.read_record(&mut record)? {
317 let parsed: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
318
319 if price_precision.is_none() {
320 if let Some(bid_price) = parsed.bids_0_price {
321 max_price_precision = infer_precision(bid_price).max(max_price_precision);
322 }
323 }
324
325 if size_precision.is_none() {
326 if let Some(bid_amount) = parsed.bids_0_amount {
327 max_size_precision = infer_precision(bid_amount).max(max_size_precision);
328 }
329 }
330
331 if let Some(limit) = limit {
332 if count >= limit {
333 break;
334 }
335 count += 1;
336 }
337 }
338
339 drop(reader);
340
341 max_price_precision = max_price_precision.min(FIXED_PRECISION);
342 max_size_precision = max_size_precision.min(FIXED_PRECISION);
343
344 (
345 price_precision.unwrap_or(max_price_precision),
346 size_precision.unwrap_or(max_size_precision),
347 )
348 }
349 };
350
351 let mut depths: Vec<OrderBookDepth10> = Vec::new();
352
353 let mut reader = create_csv_reader(filepath)?;
354 let mut record = StringRecord::new();
355 while reader.read_record(&mut record)? {
356 let record: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
357 let instrument_id = match &instrument_id {
358 Some(id) => *id,
359 None => parse_instrument_id(&record.exchange, record.symbol),
360 };
361 let flags = RecordFlag::F_LAST.value();
362 let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
364 let ts_init = parse_timestamp(record.local_timestamp);
365
366 let mut bids = [NULL_ORDER; DEPTH10_LEN];
368 let mut asks = [NULL_ORDER; DEPTH10_LEN];
369 let mut bid_counts = [0u32; DEPTH10_LEN];
370 let mut ask_counts = [0u32; DEPTH10_LEN];
371
372 for i in 0..=4 {
373 let (bid_order, bid_count) = create_book_order(
375 OrderSide::Buy,
376 match i {
377 0 => record.bids_0_price,
378 1 => record.bids_1_price,
379 2 => record.bids_2_price,
380 3 => record.bids_3_price,
381 4 => record.bids_4_price,
382 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
383 },
384 match i {
385 0 => record.bids_0_amount,
386 1 => record.bids_1_amount,
387 2 => record.bids_2_amount,
388 3 => record.bids_3_amount,
389 4 => record.bids_4_amount,
390 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
391 },
392 price_precision,
393 size_precision,
394 );
395 bids[i] = bid_order;
396 bid_counts[i] = bid_count;
397
398 let (ask_order, ask_count) = create_book_order(
400 OrderSide::Sell,
401 match i {
402 0 => record.asks_0_price,
403 1 => record.asks_1_price,
404 2 => record.asks_2_price,
405 3 => record.asks_3_price,
406 4 => record.asks_4_price,
407 _ => None, },
409 match i {
410 0 => record.asks_0_amount,
411 1 => record.asks_1_amount,
412 2 => record.asks_2_amount,
413 3 => record.asks_3_amount,
414 4 => record.asks_4_amount,
415 _ => None, },
417 price_precision,
418 size_precision,
419 );
420 asks[i] = ask_order;
421 ask_counts[i] = ask_count;
422 }
423
424 let depth = OrderBookDepth10::new(
425 instrument_id,
426 bids,
427 asks,
428 bid_counts,
429 ask_counts,
430 flags,
431 sequence,
432 ts_event,
433 ts_init,
434 );
435
436 depths.push(depth);
437
438 if let Some(limit) = limit {
439 if depths.len() >= limit {
440 break;
441 }
442 }
443 }
444
445 Ok(depths)
446}
447
448pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
451 filepath: P,
452 price_precision: Option<u8>,
453 size_precision: Option<u8>,
454 instrument_id: Option<InstrumentId>,
455 limit: Option<usize>,
456) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
457 let (price_precision, size_precision) = match (price_precision, size_precision) {
459 (Some(p), Some(s)) => (p, s),
460 (price_precision, size_precision) => {
461 let mut reader = create_csv_reader(&filepath)?;
462 let mut record = StringRecord::new();
463
464 let mut max_price_precision = 0u8;
465 let mut max_size_precision = 0u8;
466 let mut count = 0;
467
468 while reader.read_record(&mut record)? {
469 let parsed: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
470
471 if price_precision.is_none() {
472 if let Some(bid_price) = parsed.bids_0_price {
473 max_price_precision = infer_precision(bid_price).max(max_price_precision);
474 }
475 }
476
477 if size_precision.is_none() {
478 if let Some(bid_amount) = parsed.bids_0_amount {
479 max_size_precision = infer_precision(bid_amount).max(max_size_precision);
480 }
481 }
482
483 if let Some(limit) = limit {
484 if count >= limit {
485 break;
486 }
487 count += 1;
488 }
489 }
490
491 drop(reader);
492
493 max_price_precision = max_price_precision.min(FIXED_PRECISION);
494 max_size_precision = max_size_precision.min(FIXED_PRECISION);
495
496 (
497 price_precision.unwrap_or(max_price_precision),
498 size_precision.unwrap_or(max_size_precision),
499 )
500 }
501 };
502
503 let mut depths: Vec<OrderBookDepth10> = Vec::new();
504 let mut reader = create_csv_reader(filepath)?;
505 let mut record = StringRecord::new();
506
507 while reader.read_record(&mut record)? {
508 let record: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
509
510 let instrument_id = match &instrument_id {
511 Some(id) => *id,
512 None => parse_instrument_id(&record.exchange, record.symbol),
513 };
514 let flags = RecordFlag::F_LAST.value();
515 let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
517 let ts_init = parse_timestamp(record.local_timestamp);
518
519 let mut bids = [NULL_ORDER; DEPTH10_LEN];
521 let mut asks = [NULL_ORDER; DEPTH10_LEN];
522 let mut bid_counts = [0u32; DEPTH10_LEN];
523 let mut ask_counts = [0u32; DEPTH10_LEN];
524
525 for i in 0..DEPTH10_LEN {
527 let (bid_order, bid_count) = create_book_order(
529 OrderSide::Buy,
530 match i {
531 0 => record.bids_0_price,
532 1 => record.bids_1_price,
533 2 => record.bids_2_price,
534 3 => record.bids_3_price,
535 4 => record.bids_4_price,
536 5 => record.bids_5_price,
537 6 => record.bids_6_price,
538 7 => record.bids_7_price,
539 8 => record.bids_8_price,
540 9 => record.bids_9_price,
541 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
542 },
543 match i {
544 0 => record.bids_0_amount,
545 1 => record.bids_1_amount,
546 2 => record.bids_2_amount,
547 3 => record.bids_3_amount,
548 4 => record.bids_4_amount,
549 5 => record.bids_5_amount,
550 6 => record.bids_6_amount,
551 7 => record.bids_7_amount,
552 8 => record.bids_8_amount,
553 9 => record.bids_9_amount,
554 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
555 },
556 price_precision,
557 size_precision,
558 );
559 bids[i] = bid_order;
560 bid_counts[i] = bid_count;
561
562 let (ask_order, ask_count) = create_book_order(
564 OrderSide::Sell,
565 match i {
566 0 => record.asks_0_price,
567 1 => record.asks_1_price,
568 2 => record.asks_2_price,
569 3 => record.asks_3_price,
570 4 => record.asks_4_price,
571 5 => record.asks_5_price,
572 6 => record.asks_6_price,
573 7 => record.asks_7_price,
574 8 => record.asks_8_price,
575 9 => record.asks_9_price,
576 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
577 },
578 match i {
579 0 => record.asks_0_amount,
580 1 => record.asks_1_amount,
581 2 => record.asks_2_amount,
582 3 => record.asks_3_amount,
583 4 => record.asks_4_amount,
584 5 => record.asks_5_amount,
585 6 => record.asks_6_amount,
586 7 => record.asks_7_amount,
587 8 => record.asks_8_amount,
588 9 => record.asks_9_amount,
589 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
590 },
591 price_precision,
592 size_precision,
593 );
594 asks[i] = ask_order;
595 ask_counts[i] = ask_count;
596 }
597
598 let depth = OrderBookDepth10::new(
599 instrument_id,
600 bids,
601 asks,
602 bid_counts,
603 ask_counts,
604 flags,
605 sequence,
606 ts_event,
607 ts_init,
608 );
609
610 depths.push(depth);
611
612 if let Some(limit) = limit {
613 if depths.len() >= limit {
614 break;
615 }
616 }
617 }
618
619 Ok(depths)
620}
621
622pub fn load_quote_ticks<P: AsRef<Path>>(
625 filepath: P,
626 price_precision: Option<u8>,
627 size_precision: Option<u8>,
628 instrument_id: Option<InstrumentId>,
629 limit: Option<usize>,
630) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
631 let (price_precision, size_precision) = match (price_precision, size_precision) {
633 (Some(p), Some(s)) => (p, s),
634 (price_precision, size_precision) => {
635 let mut reader = create_csv_reader(&filepath)?;
636 let mut record = StringRecord::new();
637
638 let mut max_price_precision = 0u8;
639 let mut max_size_precision = 0u8;
640 let mut count = 0;
641
642 while reader.read_record(&mut record)? {
643 let parsed: TardisQuoteRecord = record.deserialize(None)?;
644
645 if price_precision.is_none() {
646 if let Some(bid_price) = parsed.bid_price {
647 max_price_precision = infer_precision(bid_price).max(max_price_precision);
648 }
649 }
650
651 if size_precision.is_none() {
652 if let Some(bid_amount) = parsed.bid_amount {
653 max_size_precision = infer_precision(bid_amount).max(max_size_precision);
654 }
655 }
656
657 if let Some(limit) = limit {
658 if count >= limit {
659 break;
660 }
661 count += 1;
662 }
663 }
664
665 drop(reader);
666
667 max_price_precision = max_price_precision.min(FIXED_PRECISION);
668 max_size_precision = max_size_precision.min(FIXED_PRECISION);
669
670 (
671 price_precision.unwrap_or(max_price_precision),
672 size_precision.unwrap_or(max_size_precision),
673 )
674 }
675 };
676
677 let mut quotes = Vec::new();
678 let mut reader = create_csv_reader(filepath)?;
679 let mut record = StringRecord::new();
680
681 while reader.read_record(&mut record)? {
682 let record: TardisQuoteRecord = record.deserialize(None)?;
683
684 let instrument_id = match &instrument_id {
685 Some(id) => *id,
686 None => parse_instrument_id(&record.exchange, record.symbol),
687 };
688 let bid_price = parse_price(record.bid_price.unwrap_or(0.0), price_precision);
689 let bid_size = Quantity::new(record.bid_amount.unwrap_or(0.0), size_precision);
690 let ask_price = parse_price(record.ask_price.unwrap_or(0.0), price_precision);
691 let ask_size = Quantity::new(record.ask_amount.unwrap_or(0.0), size_precision);
692 let ts_event = parse_timestamp(record.timestamp);
693 let ts_init = parse_timestamp(record.local_timestamp);
694
695 let quote = QuoteTick::new(
696 instrument_id,
697 bid_price,
698 ask_price,
699 bid_size,
700 ask_size,
701 ts_event,
702 ts_init,
703 );
704
705 quotes.push(quote);
706
707 if let Some(limit) = limit {
708 if quotes.len() >= limit {
709 break;
710 }
711 }
712 }
713
714 Ok(quotes)
715}
716
717pub fn load_trade_ticks<P: AsRef<Path>>(
720 filepath: P,
721 price_precision: Option<u8>,
722 size_precision: Option<u8>,
723 instrument_id: Option<InstrumentId>,
724 limit: Option<usize>,
725) -> Result<Vec<TradeTick>, Box<dyn Error>> {
726 let (price_precision, size_precision) = match (price_precision, size_precision) {
728 (Some(p), Some(s)) => (p, s),
729 (price_precision, size_precision) => {
730 let mut reader = create_csv_reader(&filepath)?;
731 let mut record = StringRecord::new();
732
733 let mut max_price_precision = 0u8;
734 let mut max_size_precision = 0u8;
735 let mut count = 0;
736
737 while reader.read_record(&mut record)? {
738 let parsed: TardisTradeRecord = record.deserialize(None)?;
739
740 if price_precision.is_none() {
741 max_price_precision = infer_precision(parsed.price).max(max_price_precision);
742 }
743
744 if size_precision.is_none() {
745 max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
746 }
747
748 if let Some(limit) = limit {
749 if count >= limit {
750 break;
751 }
752 count += 1;
753 }
754 }
755
756 drop(reader);
757
758 max_price_precision = max_price_precision.min(FIXED_PRECISION);
759 max_size_precision = max_size_precision.min(FIXED_PRECISION);
760
761 (
762 price_precision.unwrap_or(max_price_precision),
763 size_precision.unwrap_or(max_size_precision),
764 )
765 }
766 };
767
768 let mut trades = Vec::new();
769 let mut reader = create_csv_reader(filepath)?;
770 let mut record = StringRecord::new();
771
772 while reader.read_record(&mut record)? {
773 let record: TardisTradeRecord = record.deserialize(None)?;
774
775 let instrument_id = match &instrument_id {
776 Some(id) => *id,
777 None => parse_instrument_id(&record.exchange, record.symbol),
778 };
779 let price = parse_price(record.price, price_precision);
780 let size = Quantity::new(record.amount, size_precision);
781 let aggressor_side = parse_aggressor_side(&record.side);
782 let trade_id = TradeId::new(&record.id);
783 let ts_event = parse_timestamp(record.timestamp);
784 let ts_init = parse_timestamp(record.local_timestamp);
785
786 let trade = TradeTick::new(
787 instrument_id,
788 price,
789 size,
790 aggressor_side,
791 trade_id,
792 ts_event,
793 ts_init,
794 );
795
796 trades.push(trade);
797
798 if let Some(limit) = limit {
799 if trades.len() >= limit {
800 break;
801 }
802 }
803 }
804
805 Ok(trades)
806}
807
808#[cfg(test)]
812mod tests {
813 use nautilus_model::{
814 enums::{AggressorSide, BookAction},
815 identifiers::InstrumentId,
816 types::Price,
817 };
818 use nautilus_testkit::common::{
819 ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
820 ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
821 ensure_data_exists_tardis_huobi_quotes,
822 };
823 use rstest::*;
824
825 use super::*;
826
827 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
829 #[rstest]
830 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_deltas(
833 #[case] price_precision: Option<u8>,
834 #[case] size_precision: Option<u8>,
835 ) {
836 let filepath = ensure_data_exists_tardis_deribit_book_l2();
837 let deltas = load_deltas(
838 filepath,
839 price_precision,
840 size_precision,
841 None,
842 Some(10_000),
843 )
844 .unwrap();
845
846 assert_eq!(deltas.len(), 10_000);
847 assert_eq!(
848 deltas[0].instrument_id,
849 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
850 );
851 assert_eq!(deltas[0].action, BookAction::Add);
852 assert_eq!(deltas[0].order.side, OrderSide::Sell);
853 assert_eq!(deltas[0].order.price, Price::from("6421.5"));
854 assert_eq!(deltas[0].order.size, Quantity::from("18640"));
855 assert_eq!(deltas[0].flags, 0);
856 assert_eq!(deltas[0].sequence, 0);
857 assert_eq!(deltas[0].ts_event, 1585699200245000000);
858 assert_eq!(deltas[0].ts_init, 1585699200355684000);
859 }
860
861 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
863 #[rstest]
864 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot5(
867 #[case] price_precision: Option<u8>,
868 #[case] size_precision: Option<u8>,
869 ) {
870 let filepath = ensure_data_exists_tardis_binance_snapshot5();
871 let depths = load_depth10_from_snapshot5(
872 filepath,
873 price_precision,
874 size_precision,
875 None,
876 Some(10_000),
877 )
878 .unwrap();
879
880 assert_eq!(depths.len(), 10_000);
881 assert_eq!(
882 depths[0].instrument_id,
883 InstrumentId::from("BTCUSDT.BINANCE")
884 );
885 assert_eq!(depths[0].bids.len(), 10);
886 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
887 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
888 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
889 assert_eq!(depths[0].bids[0].order_id, 0);
890 assert_eq!(depths[0].asks.len(), 10);
891 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
892 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
893 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
894 assert_eq!(depths[0].asks[0].order_id, 0);
895 assert_eq!(depths[0].bid_counts[0], 1);
896 assert_eq!(depths[0].ask_counts[0], 1);
897 assert_eq!(depths[0].flags, 128);
898 assert_eq!(depths[0].ts_event, 1598918403696000000);
899 assert_eq!(depths[0].ts_init, 1598918403810979000);
900 assert_eq!(depths[0].sequence, 0);
901 }
902
903 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
905 #[rstest]
906 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot25(
909 #[case] price_precision: Option<u8>,
910 #[case] size_precision: Option<u8>,
911 ) {
912 let filepath = ensure_data_exists_tardis_binance_snapshot25();
913 let depths = load_depth10_from_snapshot25(
914 filepath,
915 price_precision,
916 size_precision,
917 None,
918 Some(10_000),
919 )
920 .unwrap();
921
922 assert_eq!(depths.len(), 10_000);
923 assert_eq!(
924 depths[0].instrument_id,
925 InstrumentId::from("BTCUSDT.BINANCE")
926 );
927 assert_eq!(depths[0].bids.len(), 10);
928 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
929 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
930 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
931 assert_eq!(depths[0].bids[0].order_id, 0);
932 assert_eq!(depths[0].asks.len(), 10);
933 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
934 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
935 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
936 assert_eq!(depths[0].asks[0].order_id, 0);
937 assert_eq!(depths[0].bid_counts[0], 1);
938 assert_eq!(depths[0].ask_counts[0], 1);
939 assert_eq!(depths[0].flags, 128);
940 assert_eq!(depths[0].ts_event, 1598918403696000000);
941 assert_eq!(depths[0].ts_init, 1598918403810979000);
942 assert_eq!(depths[0].sequence, 0);
943 }
944
945 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
947 #[rstest]
948 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_quotes(
951 #[case] price_precision: Option<u8>,
952 #[case] size_precision: Option<u8>,
953 ) {
954 let filepath = ensure_data_exists_tardis_huobi_quotes();
955 let quotes = load_quote_ticks(
956 filepath,
957 price_precision,
958 size_precision,
959 None,
960 Some(10_000),
961 )
962 .unwrap();
963
964 assert_eq!(quotes.len(), 10_000);
965 assert_eq!(
966 quotes[0].instrument_id,
967 InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
968 );
969 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
970 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
971 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
972 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
973 assert_eq!(quotes[0].ts_event, 1588291201099000000);
974 assert_eq!(quotes[0].ts_init, 1588291201234268000);
975 }
976
977 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
979 #[rstest]
980 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_trades(
983 #[case] price_precision: Option<u8>,
984 #[case] size_precision: Option<u8>,
985 ) {
986 let filepath = ensure_data_exists_tardis_bitmex_trades();
987 let trades = load_trade_ticks(
988 filepath,
989 price_precision,
990 size_precision,
991 None,
992 Some(10_000),
993 )
994 .unwrap();
995
996 assert_eq!(trades.len(), 10_000);
997 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
998 assert_eq!(trades[0].price, Price::from("8531.5"));
999 assert_eq!(trades[0].size, Quantity::from("2152"));
1000 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
1001 assert_eq!(
1002 trades[0].trade_id,
1003 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1004 );
1005 assert_eq!(trades[0].ts_event, 1583020803145000000);
1006 assert_eq!(trades[0].ts_init, 1583020803307160000);
1007 }
1008}