1use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{
22 DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick,
24 },
25 enums::{OrderSide, RecordFlag},
26 identifiers::InstrumentId,
27 types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31 csv::{
32 create_book_order, create_csv_reader, infer_precision, parse_delta_record,
33 parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
34 record::{
35 TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
36 TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
37 },
38 },
39 parse::{parse_instrument_id, parse_timestamp},
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43 if explicit.is_some() {
44 return false;
45 }
46
47 let inferred = infer_precision(value).min(FIXED_PRECISION);
48 if inferred > *current {
49 *current = inferred;
50 true
51 } else {
52 false
53 }
54}
55
56fn update_deltas_precision(
57 deltas: &mut [OrderBookDelta],
58 price_precision: Option<u8>,
59 size_precision: Option<u8>,
60 current_price_precision: u8,
61 current_size_precision: u8,
62) {
63 for delta in deltas {
64 if price_precision.is_none() {
65 delta.order.price.precision = current_price_precision;
66 }
67 if size_precision.is_none() {
68 delta.order.size.precision = current_size_precision;
69 }
70 }
71}
72
73fn update_quotes_precision(
74 quotes: &mut [QuoteTick],
75 price_precision: Option<u8>,
76 size_precision: Option<u8>,
77 current_price_precision: u8,
78 current_size_precision: u8,
79) {
80 for quote in quotes {
81 if price_precision.is_none() {
82 quote.bid_price.precision = current_price_precision;
83 quote.ask_price.precision = current_price_precision;
84 }
85 if size_precision.is_none() {
86 quote.bid_size.precision = current_size_precision;
87 quote.ask_size.precision = current_size_precision;
88 }
89 }
90}
91
92fn update_trades_precision(
93 trades: &mut [TradeTick],
94 price_precision: Option<u8>,
95 size_precision: Option<u8>,
96 current_price_precision: u8,
97 current_size_precision: u8,
98) {
99 for trade in trades {
100 if price_precision.is_none() {
101 trade.price.precision = current_price_precision;
102 }
103 if size_precision.is_none() {
104 trade.size.precision = current_size_precision;
105 }
106 }
107}
108
109pub fn load_deltas<P: AsRef<Path>>(
121 filepath: P,
122 price_precision: Option<u8>,
123 size_precision: Option<u8>,
124 instrument_id: Option<InstrumentId>,
125 limit: Option<usize>,
126) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
127 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
129 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
130
131 let mut current_price_precision = price_precision.unwrap_or(0);
132 let mut current_size_precision = size_precision.unwrap_or(0);
133 let mut last_ts_event = UnixNanos::default();
134 let mut last_is_snapshot = false;
135
136 let mut reader = create_csv_reader(filepath)?;
137 let mut record = StringRecord::new();
138
139 while reader.read_record(&mut record)? {
140 if let Some(limit) = limit
141 && deltas.len() >= limit
142 {
143 break;
144 }
145
146 let data: TardisBookUpdateRecord = record.deserialize(None)?;
147
148 update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
149 update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
150
151 if data.is_snapshot && !last_is_snapshot {
153 let clear_instrument_id =
154 instrument_id.unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
155 let ts_event = parse_timestamp(data.timestamp);
156 let ts_init = parse_timestamp(data.local_timestamp);
157
158 if last_ts_event != ts_event
159 && let Some(last_delta) = deltas.last_mut()
160 {
161 last_delta.flags = RecordFlag::F_LAST.value();
162 }
163 last_ts_event = ts_event;
164
165 let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
166 deltas.push(clear_delta);
167
168 if let Some(limit) = limit
169 && deltas.len() >= limit
170 {
171 break;
172 }
173 }
174 last_is_snapshot = data.is_snapshot;
175
176 let delta = match parse_delta_record(
177 &data,
178 current_price_precision,
179 current_size_precision,
180 instrument_id,
181 ) {
182 Ok(d) => d,
183 Err(e) => {
184 log::warn!("Skipping invalid delta record: {e}");
185 continue;
186 }
187 };
188
189 let ts_event = delta.ts_event;
190 if last_ts_event != ts_event
191 && let Some(last_delta) = deltas.last_mut()
192 {
193 last_delta.flags = RecordFlag::F_LAST.value();
194 }
195
196 last_ts_event = ts_event;
197
198 deltas.push(delta);
199 }
200
201 if let Some(last_delta) = deltas.last_mut() {
203 last_delta.flags = RecordFlag::F_LAST.value();
204 }
205
206 update_deltas_precision(
209 &mut deltas,
210 price_precision,
211 size_precision,
212 current_price_precision,
213 current_size_precision,
214 );
215
216 Ok(deltas)
217}
218
219pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
231 filepath: P,
232 price_precision: Option<u8>,
233 size_precision: Option<u8>,
234 instrument_id: Option<InstrumentId>,
235 limit: Option<usize>,
236) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
237 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
239 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
240
241 let mut current_price_precision = price_precision.unwrap_or(0);
242 let mut current_size_precision = size_precision.unwrap_or(0);
243
244 let mut reader = create_csv_reader(filepath)?;
245 let mut record = StringRecord::new();
246
247 while reader.read_record(&mut record)? {
248 let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
249
250 let mut precision_updated = false;
252
253 if price_precision.is_none()
254 && let Some(bid_price) = data.bids_0_price
255 {
256 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
257 if inferred_price_precision > current_price_precision {
258 current_price_precision = inferred_price_precision;
259 precision_updated = true;
260 }
261 }
262
263 if size_precision.is_none()
264 && let Some(bid_amount) = data.bids_0_amount
265 {
266 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
267 if inferred_size_precision > current_size_precision {
268 current_size_precision = inferred_size_precision;
269 precision_updated = true;
270 }
271 }
272
273 if precision_updated {
275 for depth in &mut depths {
276 for i in 0..DEPTH10_LEN {
277 if price_precision.is_none() {
278 depth.bids[i].price.precision = current_price_precision;
279 depth.asks[i].price.precision = current_price_precision;
280 }
281 if size_precision.is_none() {
282 depth.bids[i].size.precision = current_size_precision;
283 depth.asks[i].size.precision = current_size_precision;
284 }
285 }
286 }
287 }
288
289 let instrument_id = match &instrument_id {
290 Some(id) => *id,
291 None => parse_instrument_id(&data.exchange, data.symbol),
292 };
293 let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
295 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
297 let ts_init = parse_timestamp(data.local_timestamp);
298
299 let mut bids = [NULL_ORDER; DEPTH10_LEN];
301 let mut asks = [NULL_ORDER; DEPTH10_LEN];
302 let mut bid_counts = [0u32; DEPTH10_LEN];
303 let mut ask_counts = [0u32; DEPTH10_LEN];
304
305 for i in 0..=4 {
306 let (bid_order, bid_count) = create_book_order(
308 OrderSide::Buy,
309 match i {
310 0 => data.bids_0_price,
311 1 => data.bids_1_price,
312 2 => data.bids_2_price,
313 3 => data.bids_3_price,
314 4 => data.bids_4_price,
315 _ => unreachable!("i is constrained to 0..=4 by loop"),
316 },
317 match i {
318 0 => data.bids_0_amount,
319 1 => data.bids_1_amount,
320 2 => data.bids_2_amount,
321 3 => data.bids_3_amount,
322 4 => data.bids_4_amount,
323 _ => unreachable!("i is constrained to 0..=4 by loop"),
324 },
325 current_price_precision,
326 current_size_precision,
327 );
328 bids[i] = bid_order;
329 bid_counts[i] = bid_count;
330
331 let (ask_order, ask_count) = create_book_order(
333 OrderSide::Sell,
334 match i {
335 0 => data.asks_0_price,
336 1 => data.asks_1_price,
337 2 => data.asks_2_price,
338 3 => data.asks_3_price,
339 4 => data.asks_4_price,
340 _ => None, },
342 match i {
343 0 => data.asks_0_amount,
344 1 => data.asks_1_amount,
345 2 => data.asks_2_amount,
346 3 => data.asks_3_amount,
347 4 => data.asks_4_amount,
348 _ => None, },
350 current_price_precision,
351 current_size_precision,
352 );
353 asks[i] = ask_order;
354 ask_counts[i] = ask_count;
355 }
356
357 let depth = OrderBookDepth10::new(
358 instrument_id,
359 bids,
360 asks,
361 bid_counts,
362 ask_counts,
363 flags,
364 sequence,
365 ts_event,
366 ts_init,
367 );
368
369 depths.push(depth);
370
371 if let Some(limit) = limit
372 && depths.len() >= limit
373 {
374 break;
375 }
376 }
377
378 Ok(depths)
379}
380
381pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
389 filepath: P,
390 price_precision: Option<u8>,
391 size_precision: Option<u8>,
392 instrument_id: Option<InstrumentId>,
393 limit: Option<usize>,
394) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
395 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
397 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
398
399 let mut current_price_precision = price_precision.unwrap_or(0);
400 let mut current_size_precision = size_precision.unwrap_or(0);
401 let mut reader = create_csv_reader(filepath)?;
402 let mut record = StringRecord::new();
403
404 while reader.read_record(&mut record)? {
405 let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
406
407 let mut precision_updated = false;
409
410 if price_precision.is_none()
411 && let Some(bid_price) = data.bids_0_price
412 {
413 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
414 if inferred_price_precision > current_price_precision {
415 current_price_precision = inferred_price_precision;
416 precision_updated = true;
417 }
418 }
419
420 if size_precision.is_none()
421 && let Some(bid_amount) = data.bids_0_amount
422 {
423 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
424 if inferred_size_precision > current_size_precision {
425 current_size_precision = inferred_size_precision;
426 precision_updated = true;
427 }
428 }
429
430 if precision_updated {
432 for depth in &mut depths {
433 for i in 0..DEPTH10_LEN {
434 if price_precision.is_none() {
435 depth.bids[i].price.precision = current_price_precision;
436 depth.asks[i].price.precision = current_price_precision;
437 }
438 if size_precision.is_none() {
439 depth.bids[i].size.precision = current_size_precision;
440 depth.asks[i].size.precision = current_size_precision;
441 }
442 }
443 }
444 }
445
446 let instrument_id = match &instrument_id {
447 Some(id) => *id,
448 None => parse_instrument_id(&data.exchange, data.symbol),
449 };
450 let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
452 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
454 let ts_init = parse_timestamp(data.local_timestamp);
455
456 let mut bids = [NULL_ORDER; DEPTH10_LEN];
458 let mut asks = [NULL_ORDER; DEPTH10_LEN];
459 let mut bid_counts = [0u32; DEPTH10_LEN];
460 let mut ask_counts = [0u32; DEPTH10_LEN];
461
462 for i in 0..DEPTH10_LEN {
464 let (bid_order, bid_count) = create_book_order(
466 OrderSide::Buy,
467 match i {
468 0 => data.bids_0_price,
469 1 => data.bids_1_price,
470 2 => data.bids_2_price,
471 3 => data.bids_3_price,
472 4 => data.bids_4_price,
473 5 => data.bids_5_price,
474 6 => data.bids_6_price,
475 7 => data.bids_7_price,
476 8 => data.bids_8_price,
477 9 => data.bids_9_price,
478 _ => unreachable!("i is constrained to 0..10 by loop"),
479 },
480 match i {
481 0 => data.bids_0_amount,
482 1 => data.bids_1_amount,
483 2 => data.bids_2_amount,
484 3 => data.bids_3_amount,
485 4 => data.bids_4_amount,
486 5 => data.bids_5_amount,
487 6 => data.bids_6_amount,
488 7 => data.bids_7_amount,
489 8 => data.bids_8_amount,
490 9 => data.bids_9_amount,
491 _ => unreachable!("i is constrained to 0..10 by loop"),
492 },
493 current_price_precision,
494 current_size_precision,
495 );
496 bids[i] = bid_order;
497 bid_counts[i] = bid_count;
498
499 let (ask_order, ask_count) = create_book_order(
501 OrderSide::Sell,
502 match i {
503 0 => data.asks_0_price,
504 1 => data.asks_1_price,
505 2 => data.asks_2_price,
506 3 => data.asks_3_price,
507 4 => data.asks_4_price,
508 5 => data.asks_5_price,
509 6 => data.asks_6_price,
510 7 => data.asks_7_price,
511 8 => data.asks_8_price,
512 9 => data.asks_9_price,
513 _ => unreachable!("i is constrained to 0..10 by loop"),
514 },
515 match i {
516 0 => data.asks_0_amount,
517 1 => data.asks_1_amount,
518 2 => data.asks_2_amount,
519 3 => data.asks_3_amount,
520 4 => data.asks_4_amount,
521 5 => data.asks_5_amount,
522 6 => data.asks_6_amount,
523 7 => data.asks_7_amount,
524 8 => data.asks_8_amount,
525 9 => data.asks_9_amount,
526 _ => unreachable!("i is constrained to 0..10 by loop"),
527 },
528 current_price_precision,
529 current_size_precision,
530 );
531 asks[i] = ask_order;
532 ask_counts[i] = ask_count;
533 }
534
535 let depth = OrderBookDepth10::new(
536 instrument_id,
537 bids,
538 asks,
539 bid_counts,
540 ask_counts,
541 flags,
542 sequence,
543 ts_event,
544 ts_init,
545 );
546
547 depths.push(depth);
548
549 if let Some(limit) = limit
550 && depths.len() >= limit
551 {
552 break;
553 }
554 }
555
556 Ok(depths)
557}
558
559pub fn load_quotes<P: AsRef<Path>>(
571 filepath: P,
572 price_precision: Option<u8>,
573 size_precision: Option<u8>,
574 instrument_id: Option<InstrumentId>,
575 limit: Option<usize>,
576) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
577 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
579 let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
580
581 let mut current_price_precision = price_precision.unwrap_or(0);
582 let mut current_size_precision = size_precision.unwrap_or(0);
583 let mut reader = create_csv_reader(filepath)?;
584 let mut record = StringRecord::new();
585
586 while reader.read_record(&mut record)? {
587 let data: TardisQuoteRecord = record.deserialize(None)?;
588
589 if price_precision.is_none()
590 && let Some(bid_price) = data.bid_price
591 {
592 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
593 if inferred_price_precision > current_price_precision {
594 current_price_precision = inferred_price_precision;
595 }
596 }
597
598 if size_precision.is_none()
599 && let Some(bid_amount) = data.bid_amount
600 {
601 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
602 if inferred_size_precision > current_size_precision {
603 current_size_precision = inferred_size_precision;
604 }
605 }
606
607 let quote = parse_quote_record(
608 &data,
609 current_price_precision,
610 current_size_precision,
611 instrument_id,
612 );
613
614 quotes.push(quote);
615
616 if let Some(limit) = limit
617 && quotes.len() >= limit
618 {
619 break;
620 }
621 }
622
623 update_quotes_precision(
626 &mut quotes,
627 price_precision,
628 size_precision,
629 current_price_precision,
630 current_size_precision,
631 );
632
633 Ok(quotes)
634}
635
636pub fn load_trades<P: AsRef<Path>>(
648 filepath: P,
649 price_precision: Option<u8>,
650 size_precision: Option<u8>,
651 instrument_id: Option<InstrumentId>,
652 limit: Option<usize>,
653) -> Result<Vec<TradeTick>, Box<dyn Error>> {
654 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
656 let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
657
658 let mut current_price_precision = price_precision.unwrap_or(0);
659 let mut current_size_precision = size_precision.unwrap_or(0);
660 let mut reader = create_csv_reader(filepath)?;
661 let mut record = StringRecord::new();
662
663 while reader.read_record(&mut record)? {
664 let data: TardisTradeRecord = record.deserialize(None)?;
665
666 if price_precision.is_none() {
667 let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
668 if inferred_price_precision > current_price_precision {
669 current_price_precision = inferred_price_precision;
670 }
671 }
672
673 if size_precision.is_none() {
674 let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
675 if inferred_size_precision > current_size_precision {
676 current_size_precision = inferred_size_precision;
677 }
678 }
679
680 let size = Quantity::new_checked(data.amount, current_size_precision)?;
681
682 if size.is_positive() {
683 let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
684
685 trades.push(trade);
686
687 if let Some(limit) = limit
688 && trades.len() >= limit
689 {
690 break;
691 }
692 } else {
693 log::warn!("Skipping zero-sized trade: {data:?}");
694 }
695 }
696
697 update_trades_precision(
700 &mut trades,
701 price_precision,
702 size_precision,
703 current_price_precision,
704 current_size_precision,
705 );
706
707 Ok(trades)
708}
709
710pub fn load_funding_rates<P: AsRef<Path>>(
720 filepath: P,
721 instrument_id: Option<InstrumentId>,
722 limit: Option<usize>,
723) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
724 let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
726 let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
727
728 let mut reader = create_csv_reader(filepath)?;
729 let mut record = StringRecord::new();
730
731 while reader.read_record(&mut record)? {
732 let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
733
734 if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
736 funding_rates.push(funding_rate);
737
738 if let Some(limit) = limit
739 && funding_rates.len() >= limit
740 {
741 break;
742 }
743 }
744 }
745
746 Ok(funding_rates)
747}
748
749#[cfg(test)]
750mod tests {
751 use std::{fs, fs::File, sync::Arc};
752
753 use nautilus_core::paths::get_test_data_path as get_test_data_root;
754 use nautilus_model::{
755 enums::{AggressorSide, BookAction},
756 identifiers::{InstrumentId, TradeId},
757 types::Price,
758 };
759 use nautilus_serialization::arrow::{ArrowSchemaProvider, EncodeToRecordBatch};
760 use nautilus_testkit::common::{
761 get_tardis_binance_snapshot5_path, get_tardis_binance_snapshot25_path,
762 get_tardis_bitmex_trades_path, get_tardis_deribit_book_l2_path,
763 get_tardis_huobi_quotes_path,
764 };
765 use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
766 use rstest::*;
767
768 use super::*;
769 use crate::{common::testing::get_test_data_path, parse::parse_price};
770
771 #[rstest]
772 #[case(0.0, 0)]
773 #[case(42.0, 0)]
774 #[case(0.1, 1)]
775 #[case(0.25, 2)]
776 #[case(123.0001, 4)]
777 #[case(-42.987654321, 9)]
778 #[case(1.234_567_890_123, 12)]
779 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
780 assert_eq!(infer_precision(input), expected);
781 }
782
783 #[rstest]
784 pub fn test_dynamic_precision_inference() {
785 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
786binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
787binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
788binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
789binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
790binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
791
792 let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
793 std::fs::write(&temp_file, csv_data).unwrap();
794
795 let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
796
797 assert_eq!(deltas.len(), 6);
799
800 for (i, delta) in deltas.iter().skip(1).enumerate() {
802 assert_eq!(
803 delta.order.price.precision, 4,
804 "Price precision should be 4 for delta {i}",
805 );
806 assert_eq!(
807 delta.order.size.precision, 1,
808 "Size precision should be 1 for delta {i}",
809 );
810 }
811
812 assert_eq!(deltas[0].action, BookAction::Clear);
815
816 assert_eq!(deltas[1].order.price, parse_price(50000.0, 4));
817 assert_eq!(deltas[1].order.size, Quantity::new(1.0, 1));
818
819 assert_eq!(deltas[2].order.price, parse_price(49999.5, 4));
820 assert_eq!(deltas[2].order.size, Quantity::new(2.0, 1));
821
822 assert_eq!(deltas[3].order.price, parse_price(50000.12, 4));
823 assert_eq!(deltas[3].order.size, Quantity::new(1.5, 1));
824
825 assert_eq!(deltas[4].order.price, parse_price(49999.123, 4));
826 assert_eq!(deltas[4].order.size, Quantity::new(3.0, 1));
827
828 assert_eq!(deltas[5].order.price, parse_price(50000.1234, 4));
829 assert_eq!(deltas[5].order.size, Quantity::new(0.5, 1));
830
831 assert_eq!(
832 deltas[1].order.price.precision,
833 deltas[5].order.price.precision
834 );
835 assert_eq!(
836 deltas[1].order.size.precision,
837 deltas[3].order.size.precision
838 );
839
840 std::fs::remove_file(&temp_file).ok();
841 }
842
843 #[rstest]
844 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_deltas(
847 #[case] price_precision: Option<u8>,
848 #[case] size_precision: Option<u8>,
849 ) {
850 let filepath = get_tardis_deribit_book_l2_path();
851 let deltas =
852 load_deltas(filepath, price_precision, size_precision, None, Some(100)).unwrap();
853
854 assert_eq!(deltas.len(), 16);
856
857 assert_eq!(deltas[0].action, BookAction::Clear);
859
860 assert_eq!(
862 deltas[1].instrument_id,
863 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
864 );
865 assert_eq!(deltas[1].action, BookAction::Add);
866 assert_eq!(deltas[1].order.side, OrderSide::Sell);
867 assert_eq!(deltas[1].order.price, Price::from("6421.5"));
868 assert_eq!(deltas[1].order.size, Quantity::from("18640"));
869 assert_eq!(deltas[1].flags, 0);
870 assert_eq!(deltas[1].sequence, 0);
871 assert_eq!(deltas[1].ts_event, 1585699200245000000);
872 assert_eq!(deltas[1].ts_init, 1585699200355684000);
873 }
874
875 #[rstest]
876 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot5(
879 #[case] price_precision: Option<u8>,
880 #[case] size_precision: Option<u8>,
881 ) {
882 let filepath = get_tardis_binance_snapshot5_path();
883 let depths =
884 load_depth10_from_snapshot5(filepath, price_precision, size_precision, None, Some(100))
885 .unwrap();
886
887 assert_eq!(depths.len(), 10);
888 assert_eq!(
889 depths[0].instrument_id,
890 InstrumentId::from("BTCUSDT.BINANCE")
891 );
892 assert_eq!(depths[0].bids.len(), 10);
893 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
894 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
895 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
896 assert_eq!(depths[0].bids[0].order_id, 0);
897 assert_eq!(depths[0].asks.len(), 10);
898 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
899 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
900 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
901 assert_eq!(depths[0].asks[0].order_id, 0);
902 assert_eq!(depths[0].bid_counts[0], 1);
903 assert_eq!(depths[0].ask_counts[0], 1);
904 assert_eq!(
906 depths[0].flags,
907 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
908 );
909 assert_eq!(depths[0].ts_event, 1598918403696000000);
910 assert_eq!(depths[0].ts_init, 1598918403810979000);
911 assert_eq!(depths[0].sequence, 0);
912 }
913
914 #[rstest]
915 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot25(
918 #[case] price_precision: Option<u8>,
919 #[case] size_precision: Option<u8>,
920 ) {
921 let filepath = get_tardis_binance_snapshot25_path();
922 let depths = load_depth10_from_snapshot25(
923 filepath,
924 price_precision,
925 size_precision,
926 None,
927 Some(100),
928 )
929 .unwrap();
930
931 assert_eq!(depths.len(), 10);
932 assert_eq!(
933 depths[0].instrument_id,
934 InstrumentId::from("BTCUSDT.BINANCE")
935 );
936 assert_eq!(depths[0].bids.len(), 10);
937 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
938 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
939 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
940 assert_eq!(depths[0].bids[0].order_id, 0);
941 assert_eq!(depths[0].asks.len(), 10);
942 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
943 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
944 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
945 assert_eq!(depths[0].asks[0].order_id, 0);
946 assert_eq!(depths[0].bid_counts[0], 1);
947 assert_eq!(depths[0].ask_counts[0], 1);
948 assert_eq!(
950 depths[0].flags,
951 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
952 );
953 assert_eq!(depths[0].ts_event, 1598918403696000000);
954 assert_eq!(depths[0].ts_init, 1598918403810979000);
955 assert_eq!(depths[0].sequence, 0);
956 }
957
958 #[rstest]
959 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_quotes(
962 #[case] price_precision: Option<u8>,
963 #[case] size_precision: Option<u8>,
964 ) {
965 let filepath = get_tardis_huobi_quotes_path();
966 let quotes =
967 load_quotes(filepath, price_precision, size_precision, None, Some(100)).unwrap();
968
969 assert_eq!(quotes.len(), 10);
970 assert_eq!(
971 quotes[0].instrument_id,
972 InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
973 );
974 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
975 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
976 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
977 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
978 assert_eq!(quotes[0].ts_event, 1588291201099000000);
979 assert_eq!(quotes[0].ts_init, 1588291201234268000);
980 }
981
982 #[rstest]
983 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_trades(
986 #[case] price_precision: Option<u8>,
987 #[case] size_precision: Option<u8>,
988 ) {
989 let filepath = get_tardis_bitmex_trades_path();
990 let trades =
991 load_trades(filepath, price_precision, size_precision, None, Some(100)).unwrap();
992
993 assert_eq!(trades.len(), 10);
994 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
995 assert_eq!(trades[0].price, Price::from("8531.5"));
996 assert_eq!(trades[0].size, Quantity::from("2152"));
997 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
998 assert_eq!(
999 trades[0].trade_id,
1000 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1001 );
1002 assert_eq!(trades[0].ts_event, 1583020803145000000);
1003 assert_eq!(trades[0].ts_init, 1583020803307160000);
1004 }
1005
1006 #[rstest]
1007 pub fn test_load_trades_with_zero_sized_trade() {
1008 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1010binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1011binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1012binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1013binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1014
1015 let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
1016 std::fs::write(&temp_file, csv_data).unwrap();
1017
1018 let trades = load_trades(
1019 &temp_file,
1020 Some(4),
1021 Some(1),
1022 None,
1023 None, )
1025 .unwrap();
1026
1027 assert_eq!(trades.len(), 3);
1029
1030 assert_eq!(trades[0].size, Quantity::from("1.0"));
1032 assert_eq!(trades[1].size, Quantity::from("1.5"));
1033 assert_eq!(trades[2].size, Quantity::from("3.0"));
1034
1035 assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1037 assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1038 assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1039
1040 std::fs::remove_file(&temp_file).ok();
1041 }
1042
1043 #[rstest]
1044 pub fn test_load_trades_from_local_file() {
1045 let filepath = get_test_data_path("csv/trades_1.csv");
1046 let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1047 assert_eq!(trades.len(), 2);
1048 assert_eq!(trades[0].price, Price::from("8531.5"));
1049 assert_eq!(trades[1].size, Quantity::from("1000"));
1050 }
1051
1052 #[rstest]
1053 pub fn test_load_deltas_from_local_file() {
1054 let filepath = get_test_data_path("csv/deltas_1.csv");
1055 let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1056
1057 assert_eq!(deltas.len(), 3);
1059 assert_eq!(deltas[0].action, BookAction::Clear);
1060 assert_eq!(deltas[1].order.price, Price::from("6421.5"));
1061 assert_eq!(deltas[2].order.size, Quantity::from("10000"));
1062 }
1063
1064 #[rstest]
1065 fn test_load_depth10_from_snapshot5_comprehensive() {
1066 let filepath = get_tardis_binance_snapshot5_path();
1067 let depths = load_depth10_from_snapshot5(&filepath, None, None, None, Some(100)).unwrap();
1068
1069 assert_eq!(depths.len(), 10);
1070
1071 let first = &depths[0];
1072 assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1073 assert_eq!(first.bids.len(), 10);
1074 assert_eq!(first.asks.len(), 10);
1075
1076 assert_eq!(first.bids[0].price, Price::from("11657.07"));
1078 assert_eq!(first.bids[0].size, Quantity::from("10.896"));
1079 assert_eq!(first.bids[0].side, OrderSide::Buy);
1080
1081 assert_eq!(first.bids[1].price, Price::from("11656.97"));
1082 assert_eq!(first.bids[1].size, Quantity::from("0.2"));
1083 assert_eq!(first.bids[1].side, OrderSide::Buy);
1084
1085 assert_eq!(first.bids[2].price, Price::from("11655.78"));
1086 assert_eq!(first.bids[2].size, Quantity::from("0.2"));
1087 assert_eq!(first.bids[2].side, OrderSide::Buy);
1088
1089 assert_eq!(first.bids[3].price, Price::from("11655.77"));
1090 assert_eq!(first.bids[3].size, Quantity::from("0.98"));
1091 assert_eq!(first.bids[3].side, OrderSide::Buy);
1092
1093 assert_eq!(first.bids[4].price, Price::from("11655.68"));
1094 assert_eq!(first.bids[4].size, Quantity::from("0.111"));
1095 assert_eq!(first.bids[4].side, OrderSide::Buy);
1096
1097 for i in 5..10 {
1099 assert_eq!(first.bids[i].price.raw, 0);
1100 assert_eq!(first.bids[i].size.raw, 0);
1101 assert_eq!(first.bids[i].side, OrderSide::NoOrderSide);
1102 }
1103
1104 assert_eq!(first.asks[0].price, Price::from("11657.08"));
1106 assert_eq!(first.asks[0].size, Quantity::from("1.714"));
1107 assert_eq!(first.asks[0].side, OrderSide::Sell);
1108
1109 assert_eq!(first.asks[1].price, Price::from("11657.54"));
1110 assert_eq!(first.asks[1].size, Quantity::from("5.4"));
1111 assert_eq!(first.asks[1].side, OrderSide::Sell);
1112
1113 assert_eq!(first.asks[2].price, Price::from("11657.56"));
1114 assert_eq!(first.asks[2].size, Quantity::from("0.238"));
1115 assert_eq!(first.asks[2].side, OrderSide::Sell);
1116
1117 assert_eq!(first.asks[3].price, Price::from("11657.61"));
1118 assert_eq!(first.asks[3].size, Quantity::from("0.077"));
1119 assert_eq!(first.asks[3].side, OrderSide::Sell);
1120
1121 assert_eq!(first.asks[4].price, Price::from("11657.92"));
1122 assert_eq!(first.asks[4].size, Quantity::from("0.918"));
1123 assert_eq!(first.asks[4].side, OrderSide::Sell);
1124
1125 for i in 5..10 {
1127 assert_eq!(first.asks[i].price.raw, 0);
1128 assert_eq!(first.asks[i].size.raw, 0);
1129 assert_eq!(first.asks[i].side, OrderSide::NoOrderSide);
1130 }
1131
1132 for i in 1..5 {
1134 assert!(
1135 first.bids[i].price < first.bids[i - 1].price,
1136 "Bid price at level {} should be less than level {}",
1137 i,
1138 i - 1
1139 );
1140 }
1141
1142 for i in 1..5 {
1144 assert!(
1145 first.asks[i].price > first.asks[i - 1].price,
1146 "Ask price at level {} should be greater than level {}",
1147 i,
1148 i - 1
1149 );
1150 }
1151
1152 assert!(
1154 first.asks[0].price > first.bids[0].price,
1155 "Best ask should be greater than best bid"
1156 );
1157
1158 for i in 0..5 {
1160 assert_eq!(first.bid_counts[i], 1);
1161 assert_eq!(first.ask_counts[i], 1);
1162 }
1163 for i in 5..10 {
1164 assert_eq!(first.bid_counts[i], 0);
1165 assert_eq!(first.ask_counts[i], 0);
1166 }
1167
1168 assert_eq!(
1170 first.flags,
1171 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1172 );
1173 assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1174 assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1175 assert_eq!(first.sequence, 0);
1176 }
1177
1178 #[rstest]
1179 fn test_load_depth10_from_snapshot25_comprehensive() {
1180 let filepath = get_tardis_binance_snapshot25_path();
1181 let depths = load_depth10_from_snapshot25(&filepath, None, None, None, Some(100)).unwrap();
1182
1183 assert_eq!(depths.len(), 10);
1184
1185 let first = &depths[0];
1186 assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1187 assert_eq!(first.bids.len(), 10);
1188 assert_eq!(first.asks.len(), 10);
1189
1190 let expected_bids = vec![
1192 ("11657.07", "10.896"),
1193 ("11656.97", "0.2"),
1194 ("11655.78", "0.2"),
1195 ("11655.77", "0.98"),
1196 ("11655.68", "0.111"),
1197 ("11655.66", "0.077"),
1198 ("11655.57", "0.34"),
1199 ("11655.48", "0.4"),
1200 ("11655.26", "1.185"),
1201 ("11654.86", "0.195"),
1202 ];
1203
1204 for (i, (price, size)) in expected_bids.iter().enumerate() {
1205 assert_eq!(first.bids[i].price, Price::from(*price));
1206 assert_eq!(first.bids[i].size, Quantity::from(*size));
1207 assert_eq!(first.bids[i].side, OrderSide::Buy);
1208 }
1209
1210 let expected_asks = vec![
1212 ("11657.08", "1.714"),
1213 ("11657.54", "5.4"),
1214 ("11657.56", "0.238"),
1215 ("11657.61", "0.077"),
1216 ("11657.92", "0.918"),
1217 ("11658.09", "1.015"),
1218 ("11658.12", "0.665"),
1219 ("11658.19", "0.583"),
1220 ("11658.28", "0.255"),
1221 ("11658.29", "0.656"),
1222 ];
1223
1224 for (i, (price, size)) in expected_asks.iter().enumerate() {
1225 assert_eq!(first.asks[i].price, Price::from(*price));
1226 assert_eq!(first.asks[i].size, Quantity::from(*size));
1227 assert_eq!(first.asks[i].side, OrderSide::Sell);
1228 }
1229
1230 for i in 1..10 {
1232 assert!(
1233 first.bids[i].price < first.bids[i - 1].price,
1234 "Bid price at level {} ({}) should be less than level {} ({})",
1235 i,
1236 first.bids[i].price,
1237 i - 1,
1238 first.bids[i - 1].price
1239 );
1240 }
1241
1242 for i in 1..10 {
1244 assert!(
1245 first.asks[i].price > first.asks[i - 1].price,
1246 "Ask price at level {} ({}) should be greater than level {} ({})",
1247 i,
1248 first.asks[i].price,
1249 i - 1,
1250 first.asks[i - 1].price
1251 );
1252 }
1253
1254 assert!(
1256 first.asks[0].price > first.bids[0].price,
1257 "Best ask ({}) should be greater than best bid ({})",
1258 first.asks[0].price,
1259 first.bids[0].price
1260 );
1261
1262 for i in 0..10 {
1264 assert_eq!(first.bid_counts[i], 1);
1265 assert_eq!(first.ask_counts[i], 1);
1266 }
1267
1268 assert_eq!(
1270 first.flags,
1271 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1272 );
1273 assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1274 assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1275 assert_eq!(first.sequence, 0);
1276 }
1277
1278 #[rstest]
1279 fn test_snapshot_csv_field_order_interleaved() {
1280 let csv_data = "exchange,symbol,timestamp,local_timestamp,\
1284asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,\
1285asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,\
1286asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,\
1287asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,\
1288asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1289binance-futures,BTCUSDT,1000000,2000000,\
1290100.5,1.0,100.4,2.0,\
1291100.6,1.1,100.3,2.1,\
1292100.7,1.2,100.2,2.2,\
1293100.8,1.3,100.1,2.3,\
1294100.9,1.4,100.0,2.4";
1295
1296 let temp_file = std::env::temp_dir().join("test_interleaved_snapshot5.csv");
1297 std::fs::write(&temp_file, csv_data).unwrap();
1298
1299 let depths = load_depth10_from_snapshot5(&temp_file, None, None, None, Some(1)).unwrap();
1300 assert_eq!(depths.len(), 1);
1301
1302 let depth = &depths[0];
1303
1304 assert_eq!(depth.bids[0].price, Price::from("100.4"));
1306 assert_eq!(depth.bids[1].price, Price::from("100.3"));
1307 assert_eq!(depth.bids[2].price, Price::from("100.2"));
1308 assert_eq!(depth.bids[3].price, Price::from("100.1"));
1309 assert_eq!(depth.bids[4].price, Price::from("100.0"));
1310
1311 assert_eq!(depth.asks[0].price, Price::from("100.5"));
1313 assert_eq!(depth.asks[1].price, Price::from("100.6"));
1314 assert_eq!(depth.asks[2].price, Price::from("100.7"));
1315 assert_eq!(depth.asks[3].price, Price::from("100.8"));
1316 assert_eq!(depth.asks[4].price, Price::from("100.9"));
1317
1318 assert_eq!(depth.bids[0].size, Quantity::from("2.0"));
1320 assert_eq!(depth.asks[0].size, Quantity::from("1.0"));
1321
1322 std::fs::remove_file(temp_file).unwrap();
1323 }
1324
1325 #[rstest]
1326 fn test_load_deltas_limit_includes_clear_deltas() {
1327 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1330binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1331binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1332binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1333binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1334binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1335binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,2.0
1336binance-futures,BTCUSDT,1640995205000000,1640995205100000,false,bid,49997.0,0.5";
1337
1338 let temp_file = std::env::temp_dir().join("test_load_deltas_limit.csv");
1339 std::fs::write(&temp_file, csv_data).unwrap();
1340
1341 let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(5)).unwrap();
1343
1344 assert_eq!(deltas.len(), 5);
1346 assert_eq!(deltas[0].action, BookAction::Clear);
1347 assert_eq!(deltas[1].action, BookAction::Add);
1348 assert_eq!(deltas[2].action, BookAction::Add);
1349 assert_eq!(deltas[3].action, BookAction::Update);
1350 assert_eq!(deltas[4].action, BookAction::Update);
1351
1352 assert_eq!(deltas[3].order.price, parse_price(49999.0, 1));
1354
1355 std::fs::remove_file(&temp_file).ok();
1356 }
1357
1358 #[rstest]
1359 fn test_load_deltas_limit_stops_at_clear() {
1360 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1362binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1363binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
1364
1365 let temp_file = std::env::temp_dir().join("test_load_deltas_limit_stops_at_clear.csv");
1366 std::fs::write(&temp_file, csv_data).unwrap();
1367
1368 let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(1)).unwrap();
1370
1371 assert_eq!(deltas.len(), 1);
1372 assert_eq!(deltas[0].action, BookAction::Clear);
1373
1374 std::fs::remove_file(&temp_file).ok();
1375 }
1376
1377 #[rstest]
1378 fn test_load_deltas_limit_with_mid_day_snapshot() {
1379 let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
1382 let deltas = load_deltas(filepath, Some(1), Some(1), None, Some(5)).unwrap();
1383
1384 assert_eq!(deltas.len(), 5);
1387 assert_eq!(deltas[0].action, BookAction::Clear);
1388 }
1389
1390 #[rstest]
1393 #[ignore = "one-time dataset curation, not for routine CI"]
1394 fn test_curate_deribit_deltas() {
1395 let csv_path = get_test_data_root()
1396 .join("large")
1397 .join("tardis_deribit_incremental_book_L2_2020-04-01_BTC-PERPETUAL.csv.gz");
1398
1399 let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
1400 let parquet_path = "/tmp/tardis_BTC-PERPETUAL.DERIBIT_2020-04-01_deltas.parquet";
1401
1402 println!("Loading deltas from {}", csv_path.display());
1403 let deltas = load_deltas(&csv_path, None, None, Some(instrument_id), None).unwrap();
1404 let count = deltas.len();
1405 println!("Loaded {count} deltas");
1406
1407 let sample = deltas
1408 .iter()
1409 .find(|d| d.order.price.precision > 0)
1410 .expect("Should have at least one non-CLEAR delta");
1411 let price_precision = sample.order.price.precision;
1412 let size_precision = sample.order.size.precision;
1413 println!("Precision: price={price_precision}, size={size_precision}");
1414
1415 let metadata =
1417 OrderBookDelta::get_metadata(&instrument_id, price_precision, size_precision);
1418 let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
1419
1420 println!("Writing Parquet to {parquet_path}");
1421 let file = File::create(parquet_path).unwrap();
1422 let zstd_level = parquet::basic::ZstdLevel::try_new(3).unwrap();
1423 let props = WriterProperties::builder()
1424 .set_compression(parquet::basic::Compression::ZSTD(zstd_level))
1425 .set_max_row_group_size(1_000_000)
1426 .build();
1427 let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
1428
1429 let chunk_size = 1_000_000;
1430 for (i, chunk) in deltas.chunks(chunk_size).enumerate() {
1431 println!(" Encoding chunk {} ({} records)...", i + 1, chunk.len());
1432 let batch = OrderBookDelta::encode_batch(&metadata, chunk).unwrap();
1433 writer.write(&batch).unwrap();
1434 }
1435 writer.close().unwrap();
1436
1437 let file_size = fs::metadata(parquet_path).unwrap().len();
1438 println!("\n=== CURATION COMPLETE ===");
1439 println!("Records: {count}");
1440 println!("Price precision: {price_precision}");
1441 println!("Size precision: {size_precision}");
1442 println!(
1443 "File size: {} bytes ({:.1} MB)",
1444 file_size,
1445 file_size as f64 / 1_048_576.0
1446 );
1447 println!("Output: {parquet_path}");
1448 println!("\nNext steps:");
1449 println!(" sha256sum {parquet_path}");
1450 }
1451}