1use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{
22 DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick,
24 },
25 enums::{OrderSide, RecordFlag},
26 identifiers::InstrumentId,
27 types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31 csv::{
32 create_book_order, create_csv_reader, infer_precision, parse_delta_record,
33 parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
34 record::{
35 TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
36 TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
37 },
38 },
39 parse::{parse_instrument_id, parse_timestamp},
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43 if explicit.is_some() {
44 return false;
45 }
46
47 let inferred = infer_precision(value).min(FIXED_PRECISION);
48 if inferred > *current {
49 *current = inferred;
50 true
51 } else {
52 false
53 }
54}
55
56fn update_deltas_precision(
57 deltas: &mut [OrderBookDelta],
58 price_precision: Option<u8>,
59 size_precision: Option<u8>,
60 current_price_precision: u8,
61 current_size_precision: u8,
62) {
63 for delta in deltas {
64 if price_precision.is_none() {
65 delta.order.price.precision = current_price_precision;
66 }
67 if size_precision.is_none() {
68 delta.order.size.precision = current_size_precision;
69 }
70 }
71}
72
73fn update_quotes_precision(
74 quotes: &mut [QuoteTick],
75 price_precision: Option<u8>,
76 size_precision: Option<u8>,
77 current_price_precision: u8,
78 current_size_precision: u8,
79) {
80 for quote in quotes {
81 if price_precision.is_none() {
82 quote.bid_price.precision = current_price_precision;
83 quote.ask_price.precision = current_price_precision;
84 }
85 if size_precision.is_none() {
86 quote.bid_size.precision = current_size_precision;
87 quote.ask_size.precision = current_size_precision;
88 }
89 }
90}
91
92fn update_trades_precision(
93 trades: &mut [TradeTick],
94 price_precision: Option<u8>,
95 size_precision: Option<u8>,
96 current_price_precision: u8,
97 current_size_precision: u8,
98) {
99 for trade in trades {
100 if price_precision.is_none() {
101 trade.price.precision = current_price_precision;
102 }
103 if size_precision.is_none() {
104 trade.size.precision = current_size_precision;
105 }
106 }
107}
108
109pub fn load_deltas<P: AsRef<Path>>(
121 filepath: P,
122 price_precision: Option<u8>,
123 size_precision: Option<u8>,
124 instrument_id: Option<InstrumentId>,
125 limit: Option<usize>,
126) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
127 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
129 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
130
131 let mut current_price_precision = price_precision.unwrap_or(0);
132 let mut current_size_precision = size_precision.unwrap_or(0);
133 let mut last_ts_event = UnixNanos::default();
134 let mut last_is_snapshot = false;
135
136 let mut reader = create_csv_reader(filepath)?;
137 let mut record = StringRecord::new();
138
139 while reader.read_record(&mut record)? {
140 if let Some(limit) = limit
141 && deltas.len() >= limit
142 {
143 break;
144 }
145
146 let data: TardisBookUpdateRecord = record.deserialize(None)?;
147
148 update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
149 update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
150
151 if data.is_snapshot && !last_is_snapshot {
153 let clear_instrument_id =
154 instrument_id.unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
155 let ts_event = parse_timestamp(data.timestamp);
156 let ts_init = parse_timestamp(data.local_timestamp);
157
158 if last_ts_event != ts_event
159 && let Some(last_delta) = deltas.last_mut()
160 {
161 last_delta.flags = RecordFlag::F_LAST.value();
162 }
163 last_ts_event = ts_event;
164
165 let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
166 deltas.push(clear_delta);
167
168 if let Some(limit) = limit
169 && deltas.len() >= limit
170 {
171 break;
172 }
173 }
174 last_is_snapshot = data.is_snapshot;
175
176 let delta = match parse_delta_record(
177 &data,
178 current_price_precision,
179 current_size_precision,
180 instrument_id,
181 ) {
182 Ok(d) => d,
183 Err(e) => {
184 tracing::warn!("Skipping invalid delta record: {e}");
185 continue;
186 }
187 };
188
189 let ts_event = delta.ts_event;
190 if last_ts_event != ts_event
191 && let Some(last_delta) = deltas.last_mut()
192 {
193 last_delta.flags = RecordFlag::F_LAST.value();
194 }
195
196 last_ts_event = ts_event;
197
198 deltas.push(delta);
199 }
200
201 if let Some(last_delta) = deltas.last_mut() {
203 last_delta.flags = RecordFlag::F_LAST.value();
204 }
205
206 update_deltas_precision(
209 &mut deltas,
210 price_precision,
211 size_precision,
212 current_price_precision,
213 current_size_precision,
214 );
215
216 Ok(deltas)
217}
218
219pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
231 filepath: P,
232 price_precision: Option<u8>,
233 size_precision: Option<u8>,
234 instrument_id: Option<InstrumentId>,
235 limit: Option<usize>,
236) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
237 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
239 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
240
241 let mut current_price_precision = price_precision.unwrap_or(0);
242 let mut current_size_precision = size_precision.unwrap_or(0);
243
244 let mut reader = create_csv_reader(filepath)?;
245 let mut record = StringRecord::new();
246
247 while reader.read_record(&mut record)? {
248 let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
249
250 let mut precision_updated = false;
252
253 if price_precision.is_none()
254 && let Some(bid_price) = data.bids_0_price
255 {
256 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
257 if inferred_price_precision > current_price_precision {
258 current_price_precision = inferred_price_precision;
259 precision_updated = true;
260 }
261 }
262
263 if size_precision.is_none()
264 && let Some(bid_amount) = data.bids_0_amount
265 {
266 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
267 if inferred_size_precision > current_size_precision {
268 current_size_precision = inferred_size_precision;
269 precision_updated = true;
270 }
271 }
272
273 if precision_updated {
275 for depth in &mut depths {
276 for i in 0..DEPTH10_LEN {
277 if price_precision.is_none() {
278 depth.bids[i].price.precision = current_price_precision;
279 depth.asks[i].price.precision = current_price_precision;
280 }
281 if size_precision.is_none() {
282 depth.bids[i].size.precision = current_size_precision;
283 depth.asks[i].size.precision = current_size_precision;
284 }
285 }
286 }
287 }
288
289 let instrument_id = match &instrument_id {
290 Some(id) => *id,
291 None => parse_instrument_id(&data.exchange, data.symbol),
292 };
293 let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
295 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
297 let ts_init = parse_timestamp(data.local_timestamp);
298
299 let mut bids = [NULL_ORDER; DEPTH10_LEN];
301 let mut asks = [NULL_ORDER; DEPTH10_LEN];
302 let mut bid_counts = [0u32; DEPTH10_LEN];
303 let mut ask_counts = [0u32; DEPTH10_LEN];
304
305 for i in 0..=4 {
306 let (bid_order, bid_count) = create_book_order(
308 OrderSide::Buy,
309 match i {
310 0 => data.bids_0_price,
311 1 => data.bids_1_price,
312 2 => data.bids_2_price,
313 3 => data.bids_3_price,
314 4 => data.bids_4_price,
315 _ => unreachable!("i is constrained to 0..=4 by loop"),
316 },
317 match i {
318 0 => data.bids_0_amount,
319 1 => data.bids_1_amount,
320 2 => data.bids_2_amount,
321 3 => data.bids_3_amount,
322 4 => data.bids_4_amount,
323 _ => unreachable!("i is constrained to 0..=4 by loop"),
324 },
325 current_price_precision,
326 current_size_precision,
327 );
328 bids[i] = bid_order;
329 bid_counts[i] = bid_count;
330
331 let (ask_order, ask_count) = create_book_order(
333 OrderSide::Sell,
334 match i {
335 0 => data.asks_0_price,
336 1 => data.asks_1_price,
337 2 => data.asks_2_price,
338 3 => data.asks_3_price,
339 4 => data.asks_4_price,
340 _ => None, },
342 match i {
343 0 => data.asks_0_amount,
344 1 => data.asks_1_amount,
345 2 => data.asks_2_amount,
346 3 => data.asks_3_amount,
347 4 => data.asks_4_amount,
348 _ => None, },
350 current_price_precision,
351 current_size_precision,
352 );
353 asks[i] = ask_order;
354 ask_counts[i] = ask_count;
355 }
356
357 let depth = OrderBookDepth10::new(
358 instrument_id,
359 bids,
360 asks,
361 bid_counts,
362 ask_counts,
363 flags,
364 sequence,
365 ts_event,
366 ts_init,
367 );
368
369 depths.push(depth);
370
371 if let Some(limit) = limit
372 && depths.len() >= limit
373 {
374 break;
375 }
376 }
377
378 Ok(depths)
379}
380
381pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
389 filepath: P,
390 price_precision: Option<u8>,
391 size_precision: Option<u8>,
392 instrument_id: Option<InstrumentId>,
393 limit: Option<usize>,
394) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
395 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
397 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
398
399 let mut current_price_precision = price_precision.unwrap_or(0);
400 let mut current_size_precision = size_precision.unwrap_or(0);
401 let mut reader = create_csv_reader(filepath)?;
402 let mut record = StringRecord::new();
403
404 while reader.read_record(&mut record)? {
405 let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
406
407 let mut precision_updated = false;
409
410 if price_precision.is_none()
411 && let Some(bid_price) = data.bids_0_price
412 {
413 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
414 if inferred_price_precision > current_price_precision {
415 current_price_precision = inferred_price_precision;
416 precision_updated = true;
417 }
418 }
419
420 if size_precision.is_none()
421 && let Some(bid_amount) = data.bids_0_amount
422 {
423 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
424 if inferred_size_precision > current_size_precision {
425 current_size_precision = inferred_size_precision;
426 precision_updated = true;
427 }
428 }
429
430 if precision_updated {
432 for depth in &mut depths {
433 for i in 0..DEPTH10_LEN {
434 if price_precision.is_none() {
435 depth.bids[i].price.precision = current_price_precision;
436 depth.asks[i].price.precision = current_price_precision;
437 }
438 if size_precision.is_none() {
439 depth.bids[i].size.precision = current_size_precision;
440 depth.asks[i].size.precision = current_size_precision;
441 }
442 }
443 }
444 }
445
446 let instrument_id = match &instrument_id {
447 Some(id) => *id,
448 None => parse_instrument_id(&data.exchange, data.symbol),
449 };
450 let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
452 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
454 let ts_init = parse_timestamp(data.local_timestamp);
455
456 let mut bids = [NULL_ORDER; DEPTH10_LEN];
458 let mut asks = [NULL_ORDER; DEPTH10_LEN];
459 let mut bid_counts = [0u32; DEPTH10_LEN];
460 let mut ask_counts = [0u32; DEPTH10_LEN];
461
462 for i in 0..DEPTH10_LEN {
464 let (bid_order, bid_count) = create_book_order(
466 OrderSide::Buy,
467 match i {
468 0 => data.bids_0_price,
469 1 => data.bids_1_price,
470 2 => data.bids_2_price,
471 3 => data.bids_3_price,
472 4 => data.bids_4_price,
473 5 => data.bids_5_price,
474 6 => data.bids_6_price,
475 7 => data.bids_7_price,
476 8 => data.bids_8_price,
477 9 => data.bids_9_price,
478 _ => unreachable!("i is constrained to 0..10 by loop"),
479 },
480 match i {
481 0 => data.bids_0_amount,
482 1 => data.bids_1_amount,
483 2 => data.bids_2_amount,
484 3 => data.bids_3_amount,
485 4 => data.bids_4_amount,
486 5 => data.bids_5_amount,
487 6 => data.bids_6_amount,
488 7 => data.bids_7_amount,
489 8 => data.bids_8_amount,
490 9 => data.bids_9_amount,
491 _ => unreachable!("i is constrained to 0..10 by loop"),
492 },
493 current_price_precision,
494 current_size_precision,
495 );
496 bids[i] = bid_order;
497 bid_counts[i] = bid_count;
498
499 let (ask_order, ask_count) = create_book_order(
501 OrderSide::Sell,
502 match i {
503 0 => data.asks_0_price,
504 1 => data.asks_1_price,
505 2 => data.asks_2_price,
506 3 => data.asks_3_price,
507 4 => data.asks_4_price,
508 5 => data.asks_5_price,
509 6 => data.asks_6_price,
510 7 => data.asks_7_price,
511 8 => data.asks_8_price,
512 9 => data.asks_9_price,
513 _ => unreachable!("i is constrained to 0..10 by loop"),
514 },
515 match i {
516 0 => data.asks_0_amount,
517 1 => data.asks_1_amount,
518 2 => data.asks_2_amount,
519 3 => data.asks_3_amount,
520 4 => data.asks_4_amount,
521 5 => data.asks_5_amount,
522 6 => data.asks_6_amount,
523 7 => data.asks_7_amount,
524 8 => data.asks_8_amount,
525 9 => data.asks_9_amount,
526 _ => unreachable!("i is constrained to 0..10 by loop"),
527 },
528 current_price_precision,
529 current_size_precision,
530 );
531 asks[i] = ask_order;
532 ask_counts[i] = ask_count;
533 }
534
535 let depth = OrderBookDepth10::new(
536 instrument_id,
537 bids,
538 asks,
539 bid_counts,
540 ask_counts,
541 flags,
542 sequence,
543 ts_event,
544 ts_init,
545 );
546
547 depths.push(depth);
548
549 if let Some(limit) = limit
550 && depths.len() >= limit
551 {
552 break;
553 }
554 }
555
556 Ok(depths)
557}
558
559pub fn load_quotes<P: AsRef<Path>>(
571 filepath: P,
572 price_precision: Option<u8>,
573 size_precision: Option<u8>,
574 instrument_id: Option<InstrumentId>,
575 limit: Option<usize>,
576) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
577 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
579 let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
580
581 let mut current_price_precision = price_precision.unwrap_or(0);
582 let mut current_size_precision = size_precision.unwrap_or(0);
583 let mut reader = create_csv_reader(filepath)?;
584 let mut record = StringRecord::new();
585
586 while reader.read_record(&mut record)? {
587 let data: TardisQuoteRecord = record.deserialize(None)?;
588
589 if price_precision.is_none()
590 && let Some(bid_price) = data.bid_price
591 {
592 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
593 if inferred_price_precision > current_price_precision {
594 current_price_precision = inferred_price_precision;
595 }
596 }
597
598 if size_precision.is_none()
599 && let Some(bid_amount) = data.bid_amount
600 {
601 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
602 if inferred_size_precision > current_size_precision {
603 current_size_precision = inferred_size_precision;
604 }
605 }
606
607 let quote = parse_quote_record(
608 &data,
609 current_price_precision,
610 current_size_precision,
611 instrument_id,
612 );
613
614 quotes.push(quote);
615
616 if let Some(limit) = limit
617 && quotes.len() >= limit
618 {
619 break;
620 }
621 }
622
623 update_quotes_precision(
626 &mut quotes,
627 price_precision,
628 size_precision,
629 current_price_precision,
630 current_size_precision,
631 );
632
633 Ok(quotes)
634}
635
636pub fn load_trades<P: AsRef<Path>>(
648 filepath: P,
649 price_precision: Option<u8>,
650 size_precision: Option<u8>,
651 instrument_id: Option<InstrumentId>,
652 limit: Option<usize>,
653) -> Result<Vec<TradeTick>, Box<dyn Error>> {
654 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
656 let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
657
658 let mut current_price_precision = price_precision.unwrap_or(0);
659 let mut current_size_precision = size_precision.unwrap_or(0);
660 let mut reader = create_csv_reader(filepath)?;
661 let mut record = StringRecord::new();
662
663 while reader.read_record(&mut record)? {
664 let data: TardisTradeRecord = record.deserialize(None)?;
665
666 if price_precision.is_none() {
667 let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
668 if inferred_price_precision > current_price_precision {
669 current_price_precision = inferred_price_precision;
670 }
671 }
672
673 if size_precision.is_none() {
674 let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
675 if inferred_size_precision > current_size_precision {
676 current_size_precision = inferred_size_precision;
677 }
678 }
679
680 let size = Quantity::new_checked(data.amount, current_size_precision)?;
681
682 if size.is_positive() {
683 let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
684
685 trades.push(trade);
686
687 if let Some(limit) = limit
688 && trades.len() >= limit
689 {
690 break;
691 }
692 } else {
693 log::warn!("Skipping zero-sized trade: {data:?}");
694 }
695 }
696
697 update_trades_precision(
700 &mut trades,
701 price_precision,
702 size_precision,
703 current_price_precision,
704 current_size_precision,
705 );
706
707 Ok(trades)
708}
709
710pub fn load_funding_rates<P: AsRef<Path>>(
720 filepath: P,
721 instrument_id: Option<InstrumentId>,
722 limit: Option<usize>,
723) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
724 let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
726 let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
727
728 let mut reader = create_csv_reader(filepath)?;
729 let mut record = StringRecord::new();
730
731 while reader.read_record(&mut record)? {
732 let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
733
734 if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
736 funding_rates.push(funding_rate);
737
738 if let Some(limit) = limit
739 && funding_rates.len() >= limit
740 {
741 break;
742 }
743 }
744 }
745
746 Ok(funding_rates)
747}
748
749#[cfg(test)]
750mod tests {
751 use nautilus_model::{
752 enums::{AggressorSide, BookAction},
753 identifiers::TradeId,
754 types::Price,
755 };
756 use nautilus_testkit::common::{
757 get_tardis_binance_snapshot5_path, get_tardis_binance_snapshot25_path,
758 get_tardis_bitmex_trades_path, get_tardis_deribit_book_l2_path,
759 get_tardis_huobi_quotes_path,
760 };
761 use rstest::*;
762
763 use super::*;
764 use crate::{parse::parse_price, tests::get_test_data_path};
765
766 #[rstest]
767 #[case(0.0, 0)]
768 #[case(42.0, 0)]
769 #[case(0.1, 1)]
770 #[case(0.25, 2)]
771 #[case(123.0001, 4)]
772 #[case(-42.987654321, 9)]
773 #[case(1.234_567_890_123, 12)]
774 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
775 assert_eq!(infer_precision(input), expected);
776 }
777
778 #[rstest]
779 pub fn test_dynamic_precision_inference() {
780 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
781binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
782binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
783binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
784binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
785binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
786
787 let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
788 std::fs::write(&temp_file, csv_data).unwrap();
789
790 let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
791
792 assert_eq!(deltas.len(), 6);
794
795 for (i, delta) in deltas.iter().skip(1).enumerate() {
797 assert_eq!(
798 delta.order.price.precision, 4,
799 "Price precision should be 4 for delta {i}",
800 );
801 assert_eq!(
802 delta.order.size.precision, 1,
803 "Size precision should be 1 for delta {i}",
804 );
805 }
806
807 assert_eq!(deltas[0].action, BookAction::Clear);
810
811 assert_eq!(deltas[1].order.price, parse_price(50000.0, 4));
812 assert_eq!(deltas[1].order.size, Quantity::new(1.0, 1));
813
814 assert_eq!(deltas[2].order.price, parse_price(49999.5, 4));
815 assert_eq!(deltas[2].order.size, Quantity::new(2.0, 1));
816
817 assert_eq!(deltas[3].order.price, parse_price(50000.12, 4));
818 assert_eq!(deltas[3].order.size, Quantity::new(1.5, 1));
819
820 assert_eq!(deltas[4].order.price, parse_price(49999.123, 4));
821 assert_eq!(deltas[4].order.size, Quantity::new(3.0, 1));
822
823 assert_eq!(deltas[5].order.price, parse_price(50000.1234, 4));
824 assert_eq!(deltas[5].order.size, Quantity::new(0.5, 1));
825
826 assert_eq!(
827 deltas[1].order.price.precision,
828 deltas[5].order.price.precision
829 );
830 assert_eq!(
831 deltas[1].order.size.precision,
832 deltas[3].order.size.precision
833 );
834
835 std::fs::remove_file(&temp_file).ok();
836 }
837
838 #[rstest]
839 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_deltas(
842 #[case] price_precision: Option<u8>,
843 #[case] size_precision: Option<u8>,
844 ) {
845 let filepath = get_tardis_deribit_book_l2_path();
846 let deltas =
847 load_deltas(filepath, price_precision, size_precision, None, Some(100)).unwrap();
848
849 assert_eq!(deltas.len(), 16);
851
852 assert_eq!(deltas[0].action, BookAction::Clear);
854
855 assert_eq!(
857 deltas[1].instrument_id,
858 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
859 );
860 assert_eq!(deltas[1].action, BookAction::Add);
861 assert_eq!(deltas[1].order.side, OrderSide::Sell);
862 assert_eq!(deltas[1].order.price, Price::from("6421.5"));
863 assert_eq!(deltas[1].order.size, Quantity::from("18640"));
864 assert_eq!(deltas[1].flags, 0);
865 assert_eq!(deltas[1].sequence, 0);
866 assert_eq!(deltas[1].ts_event, 1585699200245000000);
867 assert_eq!(deltas[1].ts_init, 1585699200355684000);
868 }
869
870 #[rstest]
871 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot5(
874 #[case] price_precision: Option<u8>,
875 #[case] size_precision: Option<u8>,
876 ) {
877 let filepath = get_tardis_binance_snapshot5_path();
878 let depths =
879 load_depth10_from_snapshot5(filepath, price_precision, size_precision, None, Some(100))
880 .unwrap();
881
882 assert_eq!(depths.len(), 10);
883 assert_eq!(
884 depths[0].instrument_id,
885 InstrumentId::from("BTCUSDT.BINANCE")
886 );
887 assert_eq!(depths[0].bids.len(), 10);
888 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
889 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
890 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
891 assert_eq!(depths[0].bids[0].order_id, 0);
892 assert_eq!(depths[0].asks.len(), 10);
893 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
894 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
895 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
896 assert_eq!(depths[0].asks[0].order_id, 0);
897 assert_eq!(depths[0].bid_counts[0], 1);
898 assert_eq!(depths[0].ask_counts[0], 1);
899 assert_eq!(
901 depths[0].flags,
902 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
903 );
904 assert_eq!(depths[0].ts_event, 1598918403696000000);
905 assert_eq!(depths[0].ts_init, 1598918403810979000);
906 assert_eq!(depths[0].sequence, 0);
907 }
908
909 #[rstest]
910 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot25(
913 #[case] price_precision: Option<u8>,
914 #[case] size_precision: Option<u8>,
915 ) {
916 let filepath = get_tardis_binance_snapshot25_path();
917 let depths = load_depth10_from_snapshot25(
918 filepath,
919 price_precision,
920 size_precision,
921 None,
922 Some(100),
923 )
924 .unwrap();
925
926 assert_eq!(depths.len(), 10);
927 assert_eq!(
928 depths[0].instrument_id,
929 InstrumentId::from("BTCUSDT.BINANCE")
930 );
931 assert_eq!(depths[0].bids.len(), 10);
932 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
933 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
934 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
935 assert_eq!(depths[0].bids[0].order_id, 0);
936 assert_eq!(depths[0].asks.len(), 10);
937 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
938 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
939 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
940 assert_eq!(depths[0].asks[0].order_id, 0);
941 assert_eq!(depths[0].bid_counts[0], 1);
942 assert_eq!(depths[0].ask_counts[0], 1);
943 assert_eq!(
945 depths[0].flags,
946 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
947 );
948 assert_eq!(depths[0].ts_event, 1598918403696000000);
949 assert_eq!(depths[0].ts_init, 1598918403810979000);
950 assert_eq!(depths[0].sequence, 0);
951 }
952
953 #[rstest]
954 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_quotes(
957 #[case] price_precision: Option<u8>,
958 #[case] size_precision: Option<u8>,
959 ) {
960 let filepath = get_tardis_huobi_quotes_path();
961 let quotes =
962 load_quotes(filepath, price_precision, size_precision, None, Some(100)).unwrap();
963
964 assert_eq!(quotes.len(), 10);
965 assert_eq!(
966 quotes[0].instrument_id,
967 InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
968 );
969 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
970 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
971 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
972 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
973 assert_eq!(quotes[0].ts_event, 1588291201099000000);
974 assert_eq!(quotes[0].ts_init, 1588291201234268000);
975 }
976
977 #[rstest]
978 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_trades(
981 #[case] price_precision: Option<u8>,
982 #[case] size_precision: Option<u8>,
983 ) {
984 let filepath = get_tardis_bitmex_trades_path();
985 let trades =
986 load_trades(filepath, price_precision, size_precision, None, Some(100)).unwrap();
987
988 assert_eq!(trades.len(), 10);
989 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
990 assert_eq!(trades[0].price, Price::from("8531.5"));
991 assert_eq!(trades[0].size, Quantity::from("2152"));
992 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
993 assert_eq!(
994 trades[0].trade_id,
995 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
996 );
997 assert_eq!(trades[0].ts_event, 1583020803145000000);
998 assert_eq!(trades[0].ts_init, 1583020803307160000);
999 }
1000
1001 #[rstest]
1002 pub fn test_load_trades_with_zero_sized_trade() {
1003 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1005binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1006binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1007binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1008binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1009
1010 let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
1011 std::fs::write(&temp_file, csv_data).unwrap();
1012
1013 let trades = load_trades(
1014 &temp_file,
1015 Some(4),
1016 Some(1),
1017 None,
1018 None, )
1020 .unwrap();
1021
1022 assert_eq!(trades.len(), 3);
1024
1025 assert_eq!(trades[0].size, Quantity::from("1.0"));
1027 assert_eq!(trades[1].size, Quantity::from("1.5"));
1028 assert_eq!(trades[2].size, Quantity::from("3.0"));
1029
1030 assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1032 assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1033 assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1034
1035 std::fs::remove_file(&temp_file).ok();
1036 }
1037
1038 #[rstest]
1039 pub fn test_load_trades_from_local_file() {
1040 let filepath = get_test_data_path("csv/trades_1.csv");
1041 let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1042 assert_eq!(trades.len(), 2);
1043 assert_eq!(trades[0].price, Price::from("8531.5"));
1044 assert_eq!(trades[1].size, Quantity::from("1000"));
1045 }
1046
1047 #[rstest]
1048 pub fn test_load_deltas_from_local_file() {
1049 let filepath = get_test_data_path("csv/deltas_1.csv");
1050 let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1051
1052 assert_eq!(deltas.len(), 3);
1054 assert_eq!(deltas[0].action, BookAction::Clear);
1055 assert_eq!(deltas[1].order.price, Price::from("6421.5"));
1056 assert_eq!(deltas[2].order.size, Quantity::from("10000"));
1057 }
1058
1059 #[rstest]
1060 fn test_load_depth10_from_snapshot5_comprehensive() {
1061 let filepath = get_tardis_binance_snapshot5_path();
1062 let depths = load_depth10_from_snapshot5(&filepath, None, None, None, Some(100)).unwrap();
1063
1064 assert_eq!(depths.len(), 10);
1065
1066 let first = &depths[0];
1067 assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1068 assert_eq!(first.bids.len(), 10);
1069 assert_eq!(first.asks.len(), 10);
1070
1071 assert_eq!(first.bids[0].price, Price::from("11657.07"));
1073 assert_eq!(first.bids[0].size, Quantity::from("10.896"));
1074 assert_eq!(first.bids[0].side, OrderSide::Buy);
1075
1076 assert_eq!(first.bids[1].price, Price::from("11656.97"));
1077 assert_eq!(first.bids[1].size, Quantity::from("0.2"));
1078 assert_eq!(first.bids[1].side, OrderSide::Buy);
1079
1080 assert_eq!(first.bids[2].price, Price::from("11655.78"));
1081 assert_eq!(first.bids[2].size, Quantity::from("0.2"));
1082 assert_eq!(first.bids[2].side, OrderSide::Buy);
1083
1084 assert_eq!(first.bids[3].price, Price::from("11655.77"));
1085 assert_eq!(first.bids[3].size, Quantity::from("0.98"));
1086 assert_eq!(first.bids[3].side, OrderSide::Buy);
1087
1088 assert_eq!(first.bids[4].price, Price::from("11655.68"));
1089 assert_eq!(first.bids[4].size, Quantity::from("0.111"));
1090 assert_eq!(first.bids[4].side, OrderSide::Buy);
1091
1092 for i in 5..10 {
1094 assert_eq!(first.bids[i].price.raw, 0);
1095 assert_eq!(first.bids[i].size.raw, 0);
1096 assert_eq!(first.bids[i].side, OrderSide::NoOrderSide);
1097 }
1098
1099 assert_eq!(first.asks[0].price, Price::from("11657.08"));
1101 assert_eq!(first.asks[0].size, Quantity::from("1.714"));
1102 assert_eq!(first.asks[0].side, OrderSide::Sell);
1103
1104 assert_eq!(first.asks[1].price, Price::from("11657.54"));
1105 assert_eq!(first.asks[1].size, Quantity::from("5.4"));
1106 assert_eq!(first.asks[1].side, OrderSide::Sell);
1107
1108 assert_eq!(first.asks[2].price, Price::from("11657.56"));
1109 assert_eq!(first.asks[2].size, Quantity::from("0.238"));
1110 assert_eq!(first.asks[2].side, OrderSide::Sell);
1111
1112 assert_eq!(first.asks[3].price, Price::from("11657.61"));
1113 assert_eq!(first.asks[3].size, Quantity::from("0.077"));
1114 assert_eq!(first.asks[3].side, OrderSide::Sell);
1115
1116 assert_eq!(first.asks[4].price, Price::from("11657.92"));
1117 assert_eq!(first.asks[4].size, Quantity::from("0.918"));
1118 assert_eq!(first.asks[4].side, OrderSide::Sell);
1119
1120 for i in 5..10 {
1122 assert_eq!(first.asks[i].price.raw, 0);
1123 assert_eq!(first.asks[i].size.raw, 0);
1124 assert_eq!(first.asks[i].side, OrderSide::NoOrderSide);
1125 }
1126
1127 for i in 1..5 {
1129 assert!(
1130 first.bids[i].price < first.bids[i - 1].price,
1131 "Bid price at level {} should be less than level {}",
1132 i,
1133 i - 1
1134 );
1135 }
1136
1137 for i in 1..5 {
1139 assert!(
1140 first.asks[i].price > first.asks[i - 1].price,
1141 "Ask price at level {} should be greater than level {}",
1142 i,
1143 i - 1
1144 );
1145 }
1146
1147 assert!(
1149 first.asks[0].price > first.bids[0].price,
1150 "Best ask should be greater than best bid"
1151 );
1152
1153 for i in 0..5 {
1155 assert_eq!(first.bid_counts[i], 1);
1156 assert_eq!(first.ask_counts[i], 1);
1157 }
1158 for i in 5..10 {
1159 assert_eq!(first.bid_counts[i], 0);
1160 assert_eq!(first.ask_counts[i], 0);
1161 }
1162
1163 assert_eq!(
1165 first.flags,
1166 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1167 );
1168 assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1169 assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1170 assert_eq!(first.sequence, 0);
1171 }
1172
1173 #[rstest]
1174 fn test_load_depth10_from_snapshot25_comprehensive() {
1175 let filepath = get_tardis_binance_snapshot25_path();
1176 let depths = load_depth10_from_snapshot25(&filepath, None, None, None, Some(100)).unwrap();
1177
1178 assert_eq!(depths.len(), 10);
1179
1180 let first = &depths[0];
1181 assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1182 assert_eq!(first.bids.len(), 10);
1183 assert_eq!(first.asks.len(), 10);
1184
1185 let expected_bids = vec![
1187 ("11657.07", "10.896"),
1188 ("11656.97", "0.2"),
1189 ("11655.78", "0.2"),
1190 ("11655.77", "0.98"),
1191 ("11655.68", "0.111"),
1192 ("11655.66", "0.077"),
1193 ("11655.57", "0.34"),
1194 ("11655.48", "0.4"),
1195 ("11655.26", "1.185"),
1196 ("11654.86", "0.195"),
1197 ];
1198
1199 for (i, (price, size)) in expected_bids.iter().enumerate() {
1200 assert_eq!(first.bids[i].price, Price::from(*price));
1201 assert_eq!(first.bids[i].size, Quantity::from(*size));
1202 assert_eq!(first.bids[i].side, OrderSide::Buy);
1203 }
1204
1205 let expected_asks = vec![
1207 ("11657.08", "1.714"),
1208 ("11657.54", "5.4"),
1209 ("11657.56", "0.238"),
1210 ("11657.61", "0.077"),
1211 ("11657.92", "0.918"),
1212 ("11658.09", "1.015"),
1213 ("11658.12", "0.665"),
1214 ("11658.19", "0.583"),
1215 ("11658.28", "0.255"),
1216 ("11658.29", "0.656"),
1217 ];
1218
1219 for (i, (price, size)) in expected_asks.iter().enumerate() {
1220 assert_eq!(first.asks[i].price, Price::from(*price));
1221 assert_eq!(first.asks[i].size, Quantity::from(*size));
1222 assert_eq!(first.asks[i].side, OrderSide::Sell);
1223 }
1224
1225 for i in 1..10 {
1227 assert!(
1228 first.bids[i].price < first.bids[i - 1].price,
1229 "Bid price at level {} ({}) should be less than level {} ({})",
1230 i,
1231 first.bids[i].price,
1232 i - 1,
1233 first.bids[i - 1].price
1234 );
1235 }
1236
1237 for i in 1..10 {
1239 assert!(
1240 first.asks[i].price > first.asks[i - 1].price,
1241 "Ask price at level {} ({}) should be greater than level {} ({})",
1242 i,
1243 first.asks[i].price,
1244 i - 1,
1245 first.asks[i - 1].price
1246 );
1247 }
1248
1249 assert!(
1251 first.asks[0].price > first.bids[0].price,
1252 "Best ask ({}) should be greater than best bid ({})",
1253 first.asks[0].price,
1254 first.bids[0].price
1255 );
1256
1257 for i in 0..10 {
1259 assert_eq!(first.bid_counts[i], 1);
1260 assert_eq!(first.ask_counts[i], 1);
1261 }
1262
1263 assert_eq!(
1265 first.flags,
1266 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1267 );
1268 assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1269 assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1270 assert_eq!(first.sequence, 0);
1271 }
1272
1273 #[rstest]
1274 fn test_snapshot_csv_field_order_interleaved() {
1275 let csv_data = "exchange,symbol,timestamp,local_timestamp,\
1279asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,\
1280asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,\
1281asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,\
1282asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,\
1283asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1284binance-futures,BTCUSDT,1000000,2000000,\
1285100.5,1.0,100.4,2.0,\
1286100.6,1.1,100.3,2.1,\
1287100.7,1.2,100.2,2.2,\
1288100.8,1.3,100.1,2.3,\
1289100.9,1.4,100.0,2.4";
1290
1291 let temp_file = std::env::temp_dir().join("test_interleaved_snapshot5.csv");
1292 std::fs::write(&temp_file, csv_data).unwrap();
1293
1294 let depths = load_depth10_from_snapshot5(&temp_file, None, None, None, Some(1)).unwrap();
1295 assert_eq!(depths.len(), 1);
1296
1297 let depth = &depths[0];
1298
1299 assert_eq!(depth.bids[0].price, Price::from("100.4"));
1301 assert_eq!(depth.bids[1].price, Price::from("100.3"));
1302 assert_eq!(depth.bids[2].price, Price::from("100.2"));
1303 assert_eq!(depth.bids[3].price, Price::from("100.1"));
1304 assert_eq!(depth.bids[4].price, Price::from("100.0"));
1305
1306 assert_eq!(depth.asks[0].price, Price::from("100.5"));
1308 assert_eq!(depth.asks[1].price, Price::from("100.6"));
1309 assert_eq!(depth.asks[2].price, Price::from("100.7"));
1310 assert_eq!(depth.asks[3].price, Price::from("100.8"));
1311 assert_eq!(depth.asks[4].price, Price::from("100.9"));
1312
1313 assert_eq!(depth.bids[0].size, Quantity::from("2.0"));
1315 assert_eq!(depth.asks[0].size, Quantity::from("1.0"));
1316
1317 std::fs::remove_file(temp_file).unwrap();
1318 }
1319
1320 #[rstest]
1321 fn test_load_deltas_limit_includes_clear_deltas() {
1322 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1325binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1326binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1327binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1328binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1329binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1330binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,2.0
1331binance-futures,BTCUSDT,1640995205000000,1640995205100000,false,bid,49997.0,0.5";
1332
1333 let temp_file = std::env::temp_dir().join("test_load_deltas_limit.csv");
1334 std::fs::write(&temp_file, csv_data).unwrap();
1335
1336 let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(5)).unwrap();
1338
1339 assert_eq!(deltas.len(), 5);
1341 assert_eq!(deltas[0].action, BookAction::Clear);
1342 assert_eq!(deltas[1].action, BookAction::Add);
1343 assert_eq!(deltas[2].action, BookAction::Add);
1344 assert_eq!(deltas[3].action, BookAction::Update);
1345 assert_eq!(deltas[4].action, BookAction::Update);
1346
1347 assert_eq!(deltas[3].order.price, parse_price(49999.0, 1));
1349
1350 std::fs::remove_file(&temp_file).ok();
1351 }
1352
1353 #[rstest]
1354 fn test_load_deltas_limit_stops_at_clear() {
1355 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1357binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1358binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
1359
1360 let temp_file = std::env::temp_dir().join("test_load_deltas_limit_stops_at_clear.csv");
1361 std::fs::write(&temp_file, csv_data).unwrap();
1362
1363 let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(1)).unwrap();
1365
1366 assert_eq!(deltas.len(), 1);
1367 assert_eq!(deltas[0].action, BookAction::Clear);
1368
1369 std::fs::remove_file(&temp_file).ok();
1370 }
1371
1372 #[rstest]
1373 fn test_load_deltas_limit_with_mid_day_snapshot() {
1374 let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
1377 let deltas = load_deltas(filepath, Some(1), Some(1), None, Some(5)).unwrap();
1378
1379 assert_eq!(deltas.len(), 5);
1382 assert_eq!(deltas[0].action, BookAction::Clear);
1383 }
1384}