1mod record;
17
18use std::{
19 error::Error,
20 ffi::OsStr,
21 fs::File,
22 io::{BufReader, Read, Seek, SeekFrom},
23 path::Path,
24 time::Duration,
25};
26
27use csv::{Reader, ReaderBuilder, StringRecord};
28use flate2::read::GzDecoder;
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31 data::{
32 BookOrder, DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
33 },
34 enums::{BookAction, OrderSide, RecordFlag},
35 identifiers::{InstrumentId, TradeId},
36 types::{Quantity, fixed::FIXED_PRECISION},
37};
38
39use super::{
40 csv::record::{
41 TardisBookUpdateRecord, TardisOrderBookSnapshot5Record, TardisOrderBookSnapshot25Record,
42 TardisQuoteRecord, TardisTradeRecord,
43 },
44 parse::{
45 parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
46 parse_timestamp,
47 },
48};
49use crate::parse::parse_price;
50
51fn infer_precision(value: f64) -> u8 {
52 let mut buf = ryu::Buffer::new(); let s = buf.format(value);
54
55 match s.rsplit_once('.') {
56 Some((_, frac)) if frac != "0" => frac.len() as u8,
57 _ => 0,
58 }
59}
60
61fn create_csv_reader<P: AsRef<Path>>(
62 filepath: P,
63) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
64 let filepath_ref = filepath.as_ref();
65 const MAX_RETRIES: u8 = 3;
66 const DELAY_MS: u64 = 100;
67 const BUFFER_SIZE: usize = 8 * 1024 * 1024; fn open_file_with_retry<P: AsRef<Path>>(
70 path: P,
71 max_retries: u8,
72 delay_ms: u64,
73 ) -> anyhow::Result<File> {
74 let path_ref = path.as_ref();
75 for attempt in 1..=max_retries {
76 match File::open(path_ref) {
77 Ok(file) => return Ok(file),
78 Err(e) => {
79 if attempt == max_retries {
80 anyhow::bail!(
81 "Failed to open file '{path_ref:?}' after {max_retries} attempts: {e}"
82 );
83 }
84 eprintln!(
85 "Attempt {attempt}/{max_retries} failed to open file '{path_ref:?}': {e}. Retrying after {delay_ms}ms..."
86 );
87 std::thread::sleep(Duration::from_millis(delay_ms));
88 }
89 }
90 }
91 unreachable!("Loop should return either Ok or Err");
92 }
93
94 let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
95
96 let is_gzipped = filepath_ref
97 .extension()
98 .and_then(OsStr::to_str)
99 .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
100
101 if !is_gzipped {
102 let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
103 return Ok(ReaderBuilder::new()
104 .has_headers(true)
105 .buffer_capacity(1024 * 1024) .from_reader(Box::new(buf_reader)));
107 }
108
109 let file_size = file.metadata()?.len();
110 if file_size < 2 {
111 anyhow::bail!("File too small to be a valid gzip file");
112 }
113
114 let mut header_buf = [0u8; 2];
115 for attempt in 1..=MAX_RETRIES {
116 match file.read_exact(&mut header_buf) {
117 Ok(()) => break,
118 Err(e) => {
119 if attempt == MAX_RETRIES {
120 anyhow::bail!(
121 "Failed to read gzip header from '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
122 );
123 }
124 eprintln!(
125 "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
126 );
127 std::thread::sleep(Duration::from_millis(DELAY_MS));
128 }
129 }
130 }
131
132 if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
133 anyhow::bail!("File '{filepath_ref:?}' has .gz extension but invalid gzip header");
134 }
135
136 for attempt in 1..=MAX_RETRIES {
137 match file.seek(SeekFrom::Start(0)) {
138 Ok(_) => break,
139 Err(e) => {
140 if attempt == MAX_RETRIES {
141 anyhow::bail!(
142 "Failed to reset file position for '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
143 );
144 }
145 eprintln!(
146 "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
147 );
148 std::thread::sleep(Duration::from_millis(DELAY_MS));
149 }
150 }
151 }
152
153 let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
154 let decoder = GzDecoder::new(buf_reader);
155
156 Ok(ReaderBuilder::new()
157 .has_headers(true)
158 .buffer_capacity(1024 * 1024) .from_reader(Box::new(decoder)))
160}
161
162pub fn load_deltas<P: AsRef<Path>>(
173 filepath: P,
174 price_precision: Option<u8>,
175 size_precision: Option<u8>,
176 instrument_id: Option<InstrumentId>,
177 limit: Option<usize>,
178) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
179 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
181 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
182
183 let mut current_price_precision = price_precision.unwrap_or(0);
184 let mut current_size_precision = size_precision.unwrap_or(0);
185 let mut last_ts_event = UnixNanos::default();
186
187 let mut reader = create_csv_reader(filepath)?;
188 let mut record = StringRecord::new();
189
190 while reader.read_record(&mut record)? {
191 let data: TardisBookUpdateRecord = record.deserialize(None)?;
192
193 let mut precision_updated = false;
195
196 if price_precision.is_none() {
197 let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
198 if inferred_price_precision > current_price_precision {
199 current_price_precision = inferred_price_precision;
200 precision_updated = true;
201 }
202 }
203
204 if size_precision.is_none() {
205 let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
206 if inferred_size_precision > current_size_precision {
207 current_size_precision = inferred_size_precision;
208 precision_updated = true;
209 }
210 }
211
212 if precision_updated {
214 for delta in deltas.iter_mut() {
215 if price_precision.is_none() {
216 delta.order.price.precision = current_price_precision;
217 }
218 if size_precision.is_none() {
219 delta.order.size.precision = current_size_precision;
220 }
221 }
222 }
223
224 let instrument_id = match &instrument_id {
225 Some(id) => *id,
226 None => parse_instrument_id(&data.exchange, data.symbol),
227 };
228 let side = parse_order_side(&data.side);
229 let price = parse_price(data.price, current_price_precision);
230 let size = Quantity::new(data.amount, current_size_precision);
231 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
233
234 let action = parse_book_action(data.is_snapshot, size.as_f64());
235 let flags = 0; let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
238 let ts_init = parse_timestamp(data.local_timestamp);
239
240 if last_ts_event != ts_event
242 && let Some(last_delta) = deltas.last_mut()
243 {
244 last_delta.flags = RecordFlag::F_LAST.value();
246 }
247
248 assert!(
249 !(action != BookAction::Delete && size.is_zero()),
250 "Invalid delta: action {action} when size zero, check size_precision ({current_size_precision}) vs data; {data:?}"
251 );
252
253 last_ts_event = ts_event;
254
255 let delta = OrderBookDelta::new(
256 instrument_id,
257 action,
258 order,
259 flags,
260 sequence,
261 ts_event,
262 ts_init,
263 );
264
265 deltas.push(delta);
266
267 if let Some(limit) = limit
268 && deltas.len() >= limit
269 {
270 break;
271 }
272 }
273
274 if let Some(last_delta) = deltas.last_mut() {
276 last_delta.flags = RecordFlag::F_LAST.value();
277 }
278
279 Ok(deltas)
280}
281
282fn create_book_order(
283 side: OrderSide,
284 price: Option<f64>,
285 amount: Option<f64>,
286 price_precision: u8,
287 size_precision: u8,
288) -> (BookOrder, u32) {
289 match price {
290 Some(price) => (
291 BookOrder::new(
292 side,
293 parse_price(price, price_precision),
294 Quantity::new(amount.unwrap_or(0.0), size_precision),
295 0,
296 ),
297 1, ),
299 None => (NULL_ORDER, 0), }
301}
302
303pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
314 filepath: P,
315 price_precision: Option<u8>,
316 size_precision: Option<u8>,
317 instrument_id: Option<InstrumentId>,
318 limit: Option<usize>,
319) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
320 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
322 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
323
324 let mut current_price_precision = price_precision.unwrap_or(0);
325 let mut current_size_precision = size_precision.unwrap_or(0);
326
327 let mut reader = create_csv_reader(filepath)?;
328 let mut record = StringRecord::new();
329
330 while reader.read_record(&mut record)? {
331 let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
332
333 let mut precision_updated = false;
335
336 if price_precision.is_none() {
337 if let Some(bid_price) = data.bids_0_price {
338 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
339 if inferred_price_precision > current_price_precision {
340 current_price_precision = inferred_price_precision;
341 precision_updated = true;
342 }
343 }
344 }
345
346 if size_precision.is_none() {
347 if let Some(bid_amount) = data.bids_0_amount {
348 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
349 if inferred_size_precision > current_size_precision {
350 current_size_precision = inferred_size_precision;
351 precision_updated = true;
352 }
353 }
354 }
355
356 if precision_updated {
358 for depth in depths.iter_mut() {
359 for i in 0..DEPTH10_LEN {
360 if price_precision.is_none() {
361 depth.bids[i].price.precision = current_price_precision;
362 depth.asks[i].price.precision = current_price_precision;
363 }
364 if size_precision.is_none() {
365 depth.bids[i].size.precision = current_size_precision;
366 depth.asks[i].size.precision = current_size_precision;
367 }
368 }
369 }
370 }
371
372 let instrument_id = match &instrument_id {
373 Some(id) => *id,
374 None => parse_instrument_id(&data.exchange, data.symbol),
375 };
376 let flags = RecordFlag::F_LAST.value();
377 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
379 let ts_init = parse_timestamp(data.local_timestamp);
380
381 let mut bids = [NULL_ORDER; DEPTH10_LEN];
383 let mut asks = [NULL_ORDER; DEPTH10_LEN];
384 let mut bid_counts = [0u32; DEPTH10_LEN];
385 let mut ask_counts = [0u32; DEPTH10_LEN];
386
387 for i in 0..=4 {
388 let (bid_order, bid_count) = create_book_order(
390 OrderSide::Buy,
391 match i {
392 0 => data.bids_0_price,
393 1 => data.bids_1_price,
394 2 => data.bids_2_price,
395 3 => data.bids_3_price,
396 4 => data.bids_4_price,
397 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
398 },
399 match i {
400 0 => data.bids_0_amount,
401 1 => data.bids_1_amount,
402 2 => data.bids_2_amount,
403 3 => data.bids_3_amount,
404 4 => data.bids_4_amount,
405 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
406 },
407 current_price_precision,
408 current_size_precision,
409 );
410 bids[i] = bid_order;
411 bid_counts[i] = bid_count;
412
413 let (ask_order, ask_count) = create_book_order(
415 OrderSide::Sell,
416 match i {
417 0 => data.asks_0_price,
418 1 => data.asks_1_price,
419 2 => data.asks_2_price,
420 3 => data.asks_3_price,
421 4 => data.asks_4_price,
422 _ => None, },
424 match i {
425 0 => data.asks_0_amount,
426 1 => data.asks_1_amount,
427 2 => data.asks_2_amount,
428 3 => data.asks_3_amount,
429 4 => data.asks_4_amount,
430 _ => None, },
432 current_price_precision,
433 current_size_precision,
434 );
435 asks[i] = ask_order;
436 ask_counts[i] = ask_count;
437 }
438
439 let depth = OrderBookDepth10::new(
440 instrument_id,
441 bids,
442 asks,
443 bid_counts,
444 ask_counts,
445 flags,
446 sequence,
447 ts_event,
448 ts_init,
449 );
450
451 depths.push(depth);
452
453 if let Some(limit) = limit
454 && depths.len() >= limit
455 {
456 break;
457 }
458 }
459
460 Ok(depths)
461}
462
463pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
474 filepath: P,
475 price_precision: Option<u8>,
476 size_precision: Option<u8>,
477 instrument_id: Option<InstrumentId>,
478 limit: Option<usize>,
479) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
480 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
482 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
483
484 let mut current_price_precision = price_precision.unwrap_or(0);
485 let mut current_size_precision = size_precision.unwrap_or(0);
486 let mut reader = create_csv_reader(filepath)?;
487 let mut record = StringRecord::new();
488
489 while reader.read_record(&mut record)? {
490 let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
491
492 let mut precision_updated = false;
494
495 if price_precision.is_none() {
496 if let Some(bid_price) = data.bids_0_price {
497 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
498 if inferred_price_precision > current_price_precision {
499 current_price_precision = inferred_price_precision;
500 precision_updated = true;
501 }
502 }
503 }
504
505 if size_precision.is_none() {
506 if let Some(bid_amount) = data.bids_0_amount {
507 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
508 if inferred_size_precision > current_size_precision {
509 current_size_precision = inferred_size_precision;
510 precision_updated = true;
511 }
512 }
513 }
514
515 if precision_updated {
517 for depth in depths.iter_mut() {
518 for i in 0..DEPTH10_LEN {
519 if price_precision.is_none() {
520 depth.bids[i].price.precision = current_price_precision;
521 depth.asks[i].price.precision = current_price_precision;
522 }
523 if size_precision.is_none() {
524 depth.bids[i].size.precision = current_size_precision;
525 depth.asks[i].size.precision = current_size_precision;
526 }
527 }
528 }
529 }
530
531 let instrument_id = match &instrument_id {
532 Some(id) => *id,
533 None => parse_instrument_id(&data.exchange, data.symbol),
534 };
535 let flags = RecordFlag::F_LAST.value();
536 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
538 let ts_init = parse_timestamp(data.local_timestamp);
539
540 let mut bids = [NULL_ORDER; DEPTH10_LEN];
542 let mut asks = [NULL_ORDER; DEPTH10_LEN];
543 let mut bid_counts = [0u32; DEPTH10_LEN];
544 let mut ask_counts = [0u32; DEPTH10_LEN];
545
546 for i in 0..DEPTH10_LEN {
548 let (bid_order, bid_count) = create_book_order(
550 OrderSide::Buy,
551 match i {
552 0 => data.bids_0_price,
553 1 => data.bids_1_price,
554 2 => data.bids_2_price,
555 3 => data.bids_3_price,
556 4 => data.bids_4_price,
557 5 => data.bids_5_price,
558 6 => data.bids_6_price,
559 7 => data.bids_7_price,
560 8 => data.bids_8_price,
561 9 => data.bids_9_price,
562 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
563 },
564 match i {
565 0 => data.bids_0_amount,
566 1 => data.bids_1_amount,
567 2 => data.bids_2_amount,
568 3 => data.bids_3_amount,
569 4 => data.bids_4_amount,
570 5 => data.bids_5_amount,
571 6 => data.bids_6_amount,
572 7 => data.bids_7_amount,
573 8 => data.bids_8_amount,
574 9 => data.bids_9_amount,
575 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
576 },
577 current_price_precision,
578 current_size_precision,
579 );
580 bids[i] = bid_order;
581 bid_counts[i] = bid_count;
582
583 let (ask_order, ask_count) = create_book_order(
585 OrderSide::Sell,
586 match i {
587 0 => data.asks_0_price,
588 1 => data.asks_1_price,
589 2 => data.asks_2_price,
590 3 => data.asks_3_price,
591 4 => data.asks_4_price,
592 5 => data.asks_5_price,
593 6 => data.asks_6_price,
594 7 => data.asks_7_price,
595 8 => data.asks_8_price,
596 9 => data.asks_9_price,
597 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
598 },
599 match i {
600 0 => data.asks_0_amount,
601 1 => data.asks_1_amount,
602 2 => data.asks_2_amount,
603 3 => data.asks_3_amount,
604 4 => data.asks_4_amount,
605 5 => data.asks_5_amount,
606 6 => data.asks_6_amount,
607 7 => data.asks_7_amount,
608 8 => data.asks_8_amount,
609 9 => data.asks_9_amount,
610 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
611 },
612 current_price_precision,
613 current_size_precision,
614 );
615 asks[i] = ask_order;
616 ask_counts[i] = ask_count;
617 }
618
619 let depth = OrderBookDepth10::new(
620 instrument_id,
621 bids,
622 asks,
623 bid_counts,
624 ask_counts,
625 flags,
626 sequence,
627 ts_event,
628 ts_init,
629 );
630
631 depths.push(depth);
632
633 if let Some(limit) = limit
634 && depths.len() >= limit
635 {
636 break;
637 }
638 }
639
640 Ok(depths)
641}
642
643pub fn load_quote_ticks<P: AsRef<Path>>(
654 filepath: P,
655 price_precision: Option<u8>,
656 size_precision: Option<u8>,
657 instrument_id: Option<InstrumentId>,
658 limit: Option<usize>,
659) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
660 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
662 let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
663
664 let mut current_price_precision = price_precision.unwrap_or(0);
665 let mut current_size_precision = size_precision.unwrap_or(0);
666 let mut reader = create_csv_reader(filepath)?;
667 let mut record = StringRecord::new();
668
669 while reader.read_record(&mut record)? {
670 let data: TardisQuoteRecord = record.deserialize(None)?;
671
672 let mut precision_updated = false;
674
675 if price_precision.is_none() {
676 if let Some(bid_price) = data.bid_price {
677 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
678 if inferred_price_precision > current_price_precision {
679 current_price_precision = inferred_price_precision;
680 precision_updated = true;
681 }
682 }
683 }
684
685 if size_precision.is_none() {
686 if let Some(bid_amount) = data.bid_amount {
687 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
688 if inferred_size_precision > current_size_precision {
689 current_size_precision = inferred_size_precision;
690 precision_updated = true;
691 }
692 }
693 }
694
695 if precision_updated {
697 for quote in quotes.iter_mut() {
698 if price_precision.is_none() {
699 quote.bid_price.precision = current_price_precision;
700 quote.ask_price.precision = current_price_precision;
701 }
702 if size_precision.is_none() {
703 quote.bid_size.precision = current_size_precision;
704 quote.ask_size.precision = current_size_precision;
705 }
706 }
707 }
708
709 let instrument_id = match &instrument_id {
710 Some(id) => *id,
711 None => parse_instrument_id(&data.exchange, data.symbol),
712 };
713 let bid_price = parse_price(data.bid_price.unwrap_or(0.0), current_price_precision);
714 let bid_size = Quantity::new(data.bid_amount.unwrap_or(0.0), current_size_precision);
715 let ask_price = parse_price(data.ask_price.unwrap_or(0.0), current_price_precision);
716 let ask_size = Quantity::new(data.ask_amount.unwrap_or(0.0), current_size_precision);
717 let ts_event = parse_timestamp(data.timestamp);
718 let ts_init = parse_timestamp(data.local_timestamp);
719
720 let quote = QuoteTick::new(
721 instrument_id,
722 bid_price,
723 ask_price,
724 bid_size,
725 ask_size,
726 ts_event,
727 ts_init,
728 );
729
730 quotes.push(quote);
731
732 if let Some(limit) = limit
733 && quotes.len() >= limit
734 {
735 break;
736 }
737 }
738
739 Ok(quotes)
740}
741
742pub fn load_trade_ticks<P: AsRef<Path>>(
753 filepath: P,
754 price_precision: Option<u8>,
755 size_precision: Option<u8>,
756 instrument_id: Option<InstrumentId>,
757 limit: Option<usize>,
758) -> Result<Vec<TradeTick>, Box<dyn Error>> {
759 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
761 let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
762
763 let mut current_price_precision = price_precision.unwrap_or(0);
764 let mut current_size_precision = size_precision.unwrap_or(0);
765 let mut reader = create_csv_reader(filepath)?;
766 let mut record = StringRecord::new();
767
768 while reader.read_record(&mut record)? {
769 let data: TardisTradeRecord = record.deserialize(None)?;
770
771 let mut precision_updated = false;
773
774 if price_precision.is_none() {
775 let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
776 if inferred_price_precision > current_price_precision {
777 current_price_precision = inferred_price_precision;
778 precision_updated = true;
779 }
780 }
781
782 if size_precision.is_none() {
783 let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
784 if inferred_size_precision > current_size_precision {
785 current_size_precision = inferred_size_precision;
786 precision_updated = true;
787 }
788 }
789
790 if precision_updated {
792 for trade in trades.iter_mut() {
793 if price_precision.is_none() {
794 trade.price.precision = current_price_precision;
795 }
796 if size_precision.is_none() {
797 trade.size.precision = current_size_precision;
798 }
799 }
800 }
801
802 let instrument_id = match &instrument_id {
803 Some(id) => *id,
804 None => parse_instrument_id(&data.exchange, data.symbol),
805 };
806 let price = parse_price(data.price, current_price_precision);
807 let size = Quantity::non_zero_checked(data.amount, current_size_precision)
808 .unwrap_or_else(|e| panic!("Invalid {data:?}: size {e}"));
809 let aggressor_side = parse_aggressor_side(&data.side);
810 let trade_id = TradeId::new(&data.id);
811 let ts_event = parse_timestamp(data.timestamp);
812 let ts_init = parse_timestamp(data.local_timestamp);
813
814 let trade = TradeTick::new(
815 instrument_id,
816 price,
817 size,
818 aggressor_side,
819 trade_id,
820 ts_event,
821 ts_init,
822 );
823
824 trades.push(trade);
825
826 if let Some(limit) = limit
827 && trades.len() >= limit
828 {
829 break;
830 }
831 }
832
833 Ok(trades)
834}
835
836#[cfg(test)]
840mod tests {
841 use nautilus_model::{
842 enums::{AggressorSide, BookAction},
843 identifiers::InstrumentId,
844 types::Price,
845 };
846 use nautilus_testkit::common::{
847 ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
848 ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
849 ensure_data_exists_tardis_huobi_quotes,
850 };
851 use rstest::*;
852
853 use super::*;
854
855 #[rstest]
856 #[case(0.0, 0)]
857 #[case(42.0, 0)]
858 #[case(0.1, 1)]
859 #[case(0.25, 2)]
860 #[case(123.0001, 4)]
861 #[case(-42.987654321, 9)]
862 #[case(1.234_567_890_123, 12)]
863 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
864 assert_eq!(infer_precision(input), expected);
865 }
866
867 #[rstest]
868 pub fn test_dynamic_precision_inference() {
869 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
871binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
872binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
873binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
874binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
875binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
876
877 let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
879 std::fs::write(&temp_file, csv_data).unwrap();
880
881 let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
883
884 assert_eq!(deltas.len(), 5);
886
887 for (i, delta) in deltas.iter().enumerate() {
889 assert_eq!(
890 delta.order.price.precision, 4,
891 "Price precision should be 4 for delta {i}",
892 );
893 assert_eq!(
894 delta.order.size.precision, 1,
895 "Size precision should be 1 for delta {i}",
896 );
897 }
898
899 assert_eq!(deltas[0].order.price, parse_price(50000.0, 4));
902 assert_eq!(deltas[0].order.size, Quantity::new(1.0, 1));
903
904 assert_eq!(deltas[1].order.price, parse_price(49999.5, 4));
906 assert_eq!(deltas[1].order.size, Quantity::new(2.0, 1));
907
908 assert_eq!(deltas[2].order.price, parse_price(50000.12, 4));
910 assert_eq!(deltas[2].order.size, Quantity::new(1.5, 1));
911
912 assert_eq!(deltas[3].order.price, parse_price(49999.123, 4));
914 assert_eq!(deltas[3].order.size, Quantity::new(3.0, 1));
915
916 assert_eq!(deltas[4].order.price, parse_price(50000.1234, 4));
918 assert_eq!(deltas[4].order.size, Quantity::new(0.5, 1));
919
920 assert_eq!(
923 deltas[0].order.price.precision,
924 deltas[4].order.price.precision
925 );
926 assert_eq!(
927 deltas[0].order.size.precision,
928 deltas[2].order.size.precision
929 );
930
931 std::fs::remove_file(&temp_file).ok();
933 }
934
935 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
937 #[rstest]
938 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_deltas(
941 #[case] price_precision: Option<u8>,
942 #[case] size_precision: Option<u8>,
943 ) {
944 let filepath = ensure_data_exists_tardis_deribit_book_l2();
945 let deltas = load_deltas(
946 filepath,
947 price_precision,
948 size_precision,
949 None,
950 Some(10_000),
951 )
952 .unwrap();
953
954 assert_eq!(deltas.len(), 10_000);
955 assert_eq!(
956 deltas[0].instrument_id,
957 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
958 );
959 assert_eq!(deltas[0].action, BookAction::Add);
960 assert_eq!(deltas[0].order.side, OrderSide::Sell);
961 assert_eq!(deltas[0].order.price, Price::from("6421.5"));
962 assert_eq!(deltas[0].order.size, Quantity::from("18640"));
963 assert_eq!(deltas[0].flags, 0);
964 assert_eq!(deltas[0].sequence, 0);
965 assert_eq!(deltas[0].ts_event, 1585699200245000000);
966 assert_eq!(deltas[0].ts_init, 1585699200355684000);
967 }
968
969 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
971 #[rstest]
972 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot5(
975 #[case] price_precision: Option<u8>,
976 #[case] size_precision: Option<u8>,
977 ) {
978 let filepath = ensure_data_exists_tardis_binance_snapshot5();
979 let depths = load_depth10_from_snapshot5(
980 filepath,
981 price_precision,
982 size_precision,
983 None,
984 Some(10_000),
985 )
986 .unwrap();
987
988 assert_eq!(depths.len(), 10_000);
989 assert_eq!(
990 depths[0].instrument_id,
991 InstrumentId::from("BTCUSDT.BINANCE")
992 );
993 assert_eq!(depths[0].bids.len(), 10);
994 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
995 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
996 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
997 assert_eq!(depths[0].bids[0].order_id, 0);
998 assert_eq!(depths[0].asks.len(), 10);
999 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
1000 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
1001 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
1002 assert_eq!(depths[0].asks[0].order_id, 0);
1003 assert_eq!(depths[0].bid_counts[0], 1);
1004 assert_eq!(depths[0].ask_counts[0], 1);
1005 assert_eq!(depths[0].flags, 128);
1006 assert_eq!(depths[0].ts_event, 1598918403696000000);
1007 assert_eq!(depths[0].ts_init, 1598918403810979000);
1008 assert_eq!(depths[0].sequence, 0);
1009 }
1010
1011 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
1013 #[rstest]
1014 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot25(
1017 #[case] price_precision: Option<u8>,
1018 #[case] size_precision: Option<u8>,
1019 ) {
1020 let filepath = ensure_data_exists_tardis_binance_snapshot25();
1021 let depths = load_depth10_from_snapshot25(
1022 filepath,
1023 price_precision,
1024 size_precision,
1025 None,
1026 Some(10_000),
1027 )
1028 .unwrap();
1029
1030 assert_eq!(depths.len(), 10_000);
1031 assert_eq!(
1032 depths[0].instrument_id,
1033 InstrumentId::from("BTCUSDT.BINANCE")
1034 );
1035 assert_eq!(depths[0].bids.len(), 10);
1036 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
1037 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
1038 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
1039 assert_eq!(depths[0].bids[0].order_id, 0);
1040 assert_eq!(depths[0].asks.len(), 10);
1041 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
1042 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
1043 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
1044 assert_eq!(depths[0].asks[0].order_id, 0);
1045 assert_eq!(depths[0].bid_counts[0], 1);
1046 assert_eq!(depths[0].ask_counts[0], 1);
1047 assert_eq!(depths[0].flags, 128);
1048 assert_eq!(depths[0].ts_event, 1598918403696000000);
1049 assert_eq!(depths[0].ts_init, 1598918403810979000);
1050 assert_eq!(depths[0].sequence, 0);
1051 }
1052
1053 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
1055 #[rstest]
1056 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_quotes(
1059 #[case] price_precision: Option<u8>,
1060 #[case] size_precision: Option<u8>,
1061 ) {
1062 let filepath = ensure_data_exists_tardis_huobi_quotes();
1063 let quotes = load_quote_ticks(
1064 filepath,
1065 price_precision,
1066 size_precision,
1067 None,
1068 Some(10_000),
1069 )
1070 .unwrap();
1071
1072 assert_eq!(quotes.len(), 10_000);
1073 assert_eq!(
1074 quotes[0].instrument_id,
1075 InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
1076 );
1077 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
1078 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
1079 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
1080 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
1081 assert_eq!(quotes[0].ts_event, 1588291201099000000);
1082 assert_eq!(quotes[0].ts_init, 1588291201234268000);
1083 }
1084
1085 #[ignore = "Flaky test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
1087 #[rstest]
1088 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_trades(
1091 #[case] price_precision: Option<u8>,
1092 #[case] size_precision: Option<u8>,
1093 ) {
1094 let filepath = ensure_data_exists_tardis_bitmex_trades();
1095 let trades = load_trade_ticks(
1096 filepath,
1097 price_precision,
1098 size_precision,
1099 None,
1100 Some(10_000),
1101 )
1102 .unwrap();
1103
1104 assert_eq!(trades.len(), 10_000);
1105 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1106 assert_eq!(trades[0].price, Price::from("8531.5"));
1107 assert_eq!(trades[0].size, Quantity::from("2152"));
1108 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
1109 assert_eq!(
1110 trades[0].trade_id,
1111 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1112 );
1113 assert_eq!(trades[0].ts_event, 1583020803145000000);
1114 assert_eq!(trades[0].ts_init, 1583020803307160000);
1115 }
1116}