1use std::{io::Read, path::Path};
17
18use csv::{Reader, StringRecord};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22 enums::{OrderSide, RecordFlag},
23 identifiers::InstrumentId,
24 types::Quantity,
25};
26#[cfg(feature = "python")]
27use nautilus_model::{
28 data::{Data, OrderBookDeltas, OrderBookDeltas_API},
29 python::data::data_to_pycapsule,
30};
31#[cfg(feature = "python")]
32use pyo3::{Py, PyAny, Python};
33
34use crate::{
35 csv::{
36 create_book_order, create_csv_reader, infer_precision, parse_delta_record,
37 parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
38 record::{
39 TardisBookUpdateRecord, TardisOrderBookSnapshot5Record,
40 TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
41 },
42 },
43 parse::{parse_instrument_id, parse_timestamp},
44};
45
46struct DeltaStreamIterator {
52 reader: Reader<Box<dyn std::io::Read>>,
53 record: StringRecord,
54 buffer: Vec<OrderBookDelta>,
55 chunk_size: usize,
56 instrument_id: Option<InstrumentId>,
57 price_precision: u8,
58 size_precision: u8,
59 last_ts_event: UnixNanos,
60 last_is_snapshot: bool,
61 limit: Option<usize>,
62 deltas_emitted: usize,
63
64 pending_record: Option<TardisBookUpdateRecord>,
66}
67
68impl DeltaStreamIterator {
69 fn new<P: AsRef<Path>>(
75 filepath: P,
76 chunk_size: usize,
77 price_precision: Option<u8>,
78 size_precision: Option<u8>,
79 instrument_id: Option<InstrumentId>,
80 limit: Option<usize>,
81 ) -> anyhow::Result<Self> {
82 let (final_price_precision, final_size_precision) =
83 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
84 (price_prec, size_prec)
86 } else {
87 let mut reader = create_csv_reader(&filepath)?;
89 let mut record = StringRecord::new();
90 let (detected_price, detected_size) =
91 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
92 (
93 price_precision.unwrap_or(detected_price),
94 size_precision.unwrap_or(detected_size),
95 )
96 };
97
98 let reader = create_csv_reader(filepath)?;
99
100 Ok(Self {
101 reader,
102 record: StringRecord::new(),
103 buffer: Vec::with_capacity(chunk_size),
104 chunk_size,
105 instrument_id,
106 price_precision: final_price_precision,
107 size_precision: final_size_precision,
108 last_ts_event: UnixNanos::default(),
109 last_is_snapshot: false,
110 limit,
111 deltas_emitted: 0,
112 pending_record: None,
113 })
114 }
115
116 fn detect_precision_from_sample(
117 reader: &mut Reader<Box<dyn std::io::Read>>,
118 record: &mut StringRecord,
119 sample_size: usize,
120 ) -> anyhow::Result<(u8, u8)> {
121 let mut max_price_precision = 0u8;
122 let mut max_size_precision = 0u8;
123 let mut records_scanned = 0;
124
125 while records_scanned < sample_size {
126 match reader.read_record(record) {
127 Ok(true) => {
128 if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
129 max_price_precision = max_price_precision.max(infer_precision(data.price));
130 max_size_precision = max_size_precision.max(infer_precision(data.amount));
131 records_scanned += 1;
132 }
133 }
134 Ok(false) => break, Err(_) => records_scanned += 1, }
137 }
138
139 Ok((max_price_precision, max_size_precision))
140 }
141}
142
143impl Iterator for DeltaStreamIterator {
144 type Item = anyhow::Result<Vec<OrderBookDelta>>;
145
146 fn next(&mut self) -> Option<Self::Item> {
147 if let Some(limit) = self.limit
148 && self.deltas_emitted >= limit
149 {
150 return None;
151 }
152
153 self.buffer.clear();
154
155 loop {
156 if self.buffer.len() >= self.chunk_size {
157 break;
158 }
159
160 if let Some(limit) = self.limit
161 && self.deltas_emitted >= limit
162 {
163 break;
164 }
165
166 let data = if let Some(pending) = self.pending_record.take() {
168 pending
169 } else {
170 match self.reader.read_record(&mut self.record) {
171 Ok(true) => match self.record.deserialize::<TardisBookUpdateRecord>(None) {
172 Ok(data) => data,
173 Err(e) => {
174 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
175 }
176 },
177 Ok(false) => {
178 if self.buffer.is_empty() {
179 return None;
180 }
181 if let Some(last_delta) = self.buffer.last_mut() {
182 last_delta.flags = RecordFlag::F_LAST.value();
183 }
184 return Some(Ok(self.buffer.clone()));
185 }
186 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
187 }
188 };
189
190 if data.is_snapshot && !self.last_is_snapshot {
192 let clear_instrument_id = self
193 .instrument_id
194 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
195 let ts_event = parse_timestamp(data.timestamp);
196 let ts_init = parse_timestamp(data.local_timestamp);
197
198 if self.last_ts_event != ts_event
199 && let Some(last_delta) = self.buffer.last_mut()
200 {
201 last_delta.flags = RecordFlag::F_LAST.value();
202 }
203 self.last_ts_event = ts_event;
204
205 let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
206 self.buffer.push(clear_delta);
207 self.deltas_emitted += 1;
208
209 if self.buffer.len() >= self.chunk_size
211 || self.limit.is_some_and(|l| self.deltas_emitted >= l)
212 {
213 self.last_is_snapshot = data.is_snapshot;
214 self.pending_record = Some(data);
215 break;
216 }
217 }
218 self.last_is_snapshot = data.is_snapshot;
219
220 let delta = match parse_delta_record(
221 &data,
222 self.price_precision,
223 self.size_precision,
224 self.instrument_id,
225 ) {
226 Ok(d) => d,
227 Err(e) => {
228 log::warn!("Skipping invalid delta record: {e}");
229 continue;
230 }
231 };
232
233 if self.last_ts_event != delta.ts_event
234 && let Some(last_delta) = self.buffer.last_mut()
235 {
236 last_delta.flags = RecordFlag::F_LAST.value();
237 }
238
239 self.last_ts_event = delta.ts_event;
240
241 self.buffer.push(delta);
242 self.deltas_emitted += 1;
243 }
244
245 if self.buffer.is_empty() {
246 None
247 } else {
248 if let Some(limit) = self.limit
251 && self.deltas_emitted >= limit
252 && let Some(last_delta) = self.buffer.last_mut()
253 {
254 last_delta.flags = RecordFlag::F_LAST.value();
255 }
256 Some(Ok(self.buffer.clone()))
257 }
258 }
259}
260
261pub fn stream_deltas<P: AsRef<Path>>(
276 filepath: P,
277 chunk_size: usize,
278 price_precision: Option<u8>,
279 size_precision: Option<u8>,
280 instrument_id: Option<InstrumentId>,
281 limit: Option<usize>,
282) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDelta>>>> {
283 DeltaStreamIterator::new(
284 filepath,
285 chunk_size,
286 price_precision,
287 size_precision,
288 instrument_id,
289 limit,
290 )
291}
292
293#[cfg(feature = "python")]
298struct BatchedDeltasStreamIterator {
300 reader: Reader<Box<dyn std::io::Read>>,
301 record: StringRecord,
302 buffer: Vec<Py<PyAny>>,
303 current_batch: Vec<OrderBookDelta>,
304 pending_batches: Vec<Vec<OrderBookDelta>>,
305 chunk_size: usize,
306 instrument_id: InstrumentId,
307 price_precision: u8,
308 size_precision: u8,
309 last_ts_event: UnixNanos,
310 last_is_snapshot: bool,
311 limit: Option<usize>,
312 deltas_emitted: usize,
313}
314
315#[cfg(feature = "python")]
316impl BatchedDeltasStreamIterator {
317 fn new<P: AsRef<Path>>(
323 filepath: P,
324 chunk_size: usize,
325 price_precision: Option<u8>,
326 size_precision: Option<u8>,
327 instrument_id: Option<InstrumentId>,
328 limit: Option<usize>,
329 ) -> anyhow::Result<Self> {
330 let mut reader = create_csv_reader(&filepath)?;
331 let mut record = StringRecord::new();
332
333 let first_record = if reader.read_record(&mut record)? {
334 record.deserialize::<TardisBookUpdateRecord>(None)?
335 } else {
336 anyhow::bail!("CSV file is empty");
337 };
338
339 let final_instrument_id = instrument_id
340 .unwrap_or_else(|| parse_instrument_id(&first_record.exchange, first_record.symbol));
341
342 let (final_price_precision, final_size_precision) =
343 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
344 (price_prec, size_prec)
346 } else {
347 let (detected_price, detected_size) =
349 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
350 (
351 price_precision.unwrap_or(detected_price),
352 size_precision.unwrap_or(detected_size),
353 )
354 };
355
356 let reader = create_csv_reader(filepath)?;
357
358 Ok(Self {
359 reader,
360 record: StringRecord::new(),
361 buffer: Vec::with_capacity(chunk_size),
362 current_batch: Vec::new(),
363 pending_batches: Vec::with_capacity(chunk_size),
364 chunk_size,
365 instrument_id: final_instrument_id,
366 price_precision: final_price_precision,
367 size_precision: final_size_precision,
368 last_ts_event: UnixNanos::default(),
369 last_is_snapshot: false,
370 limit,
371 deltas_emitted: 0,
372 })
373 }
374
375 fn detect_precision_from_sample(
376 reader: &mut Reader<Box<dyn std::io::Read>>,
377 record: &mut StringRecord,
378 sample_size: usize,
379 ) -> anyhow::Result<(u8, u8)> {
380 let mut max_price_precision = 0u8;
381 let mut max_size_precision = 0u8;
382 let mut records_scanned = 0;
383
384 while records_scanned < sample_size {
385 match reader.read_record(record) {
386 Ok(true) => {
387 if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
388 max_price_precision = max_price_precision.max(infer_precision(data.price));
389 max_size_precision = max_size_precision.max(infer_precision(data.amount));
390 records_scanned += 1;
391 }
392 }
393 Ok(false) => break, Err(_) => records_scanned += 1, }
396 }
397
398 Ok((max_price_precision, max_size_precision))
399 }
400
401 fn fill_pending_batches(&mut self) -> Option<anyhow::Result<()>> {
402 self.pending_batches.clear();
403 let mut batches_created = 0;
404
405 while batches_created < self.chunk_size {
406 if let Some(limit) = self.limit
407 && self.deltas_emitted >= limit
408 {
409 break;
410 }
411
412 match self.reader.read_record(&mut self.record) {
413 Ok(true) => {
414 let data = match self.record.deserialize::<TardisBookUpdateRecord>(None) {
415 Ok(data) => data,
416 Err(e) => {
417 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
418 }
419 };
420
421 let ts_event = parse_timestamp(data.timestamp);
422 let ts_init = parse_timestamp(data.local_timestamp);
423
424 let delta = match parse_delta_record(
427 &data,
428 self.price_precision,
429 self.size_precision,
430 Some(self.instrument_id),
431 ) {
432 Ok(d) => d,
433 Err(e) => {
434 log::warn!("Skipping invalid delta record: {e}");
435 continue;
436 }
437 };
438
439 if self.last_ts_event != ts_event && !self.current_batch.is_empty() {
440 if let Some(last_delta) = self.current_batch.last_mut() {
442 last_delta.flags = RecordFlag::F_LAST.value();
443 }
444 self.pending_batches
445 .push(std::mem::take(&mut self.current_batch));
446 batches_created += 1;
447 }
448
449 self.last_ts_event = ts_event;
450
451 if data.is_snapshot && !self.last_is_snapshot {
453 let clear_delta =
454 OrderBookDelta::clear(self.instrument_id, 0, ts_event, ts_init);
455 self.current_batch.push(clear_delta);
456 self.deltas_emitted += 1;
457
458 if let Some(limit) = self.limit
459 && self.deltas_emitted >= limit
460 {
461 self.last_is_snapshot = data.is_snapshot;
462 break;
463 }
464 }
465 self.last_is_snapshot = data.is_snapshot;
466
467 self.current_batch.push(delta);
468 self.deltas_emitted += 1;
469
470 if let Some(limit) = self.limit
471 && self.deltas_emitted >= limit
472 {
473 break;
474 }
475 }
476 Ok(false) => {
477 break;
479 }
480 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
481 }
482 }
483
484 if !self.current_batch.is_empty() && batches_created < self.chunk_size {
485 if let Some(last_delta) = self.current_batch.last_mut() {
487 last_delta.flags = RecordFlag::F_LAST.value();
488 }
489 self.pending_batches
490 .push(std::mem::take(&mut self.current_batch));
491 }
492
493 if self.pending_batches.is_empty() {
494 None
495 } else {
496 Some(Ok(()))
497 }
498 }
499}
500
501#[cfg(feature = "python")]
502impl Iterator for BatchedDeltasStreamIterator {
503 type Item = anyhow::Result<Vec<Py<PyAny>>>;
504
505 fn next(&mut self) -> Option<Self::Item> {
506 if let Some(limit) = self.limit
507 && self.deltas_emitted >= limit
508 {
509 return None;
510 }
511
512 self.buffer.clear();
513 if let Some(Err(e)) = self.fill_pending_batches() {
514 return Some(Err(e));
515 }
516
517 if self.pending_batches.is_empty() {
518 None
519 } else {
520 Python::attach(|py| {
522 for batch in self.pending_batches.drain(..) {
523 let deltas = OrderBookDeltas::new(self.instrument_id, batch);
524 let deltas = OrderBookDeltas_API::new(deltas);
525 let capsule = data_to_pycapsule(py, Data::Deltas(deltas));
526 self.buffer.push(capsule);
527 }
528 });
529 Some(Ok(std::mem::take(&mut self.buffer)))
530 }
531 }
532}
533
534#[cfg(feature = "python")]
535pub fn stream_batched_deltas<P: AsRef<Path>>(
542 filepath: P,
543 chunk_size: usize,
544 price_precision: Option<u8>,
545 size_precision: Option<u8>,
546 instrument_id: Option<InstrumentId>,
547 limit: Option<usize>,
548) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<Py<PyAny>>>>> {
549 BatchedDeltasStreamIterator::new(
550 filepath,
551 chunk_size,
552 price_precision,
553 size_precision,
554 instrument_id,
555 limit,
556 )
557}
558
559struct QuoteStreamIterator {
565 reader: Reader<Box<dyn Read>>,
566 record: StringRecord,
567 buffer: Vec<QuoteTick>,
568 chunk_size: usize,
569 instrument_id: Option<InstrumentId>,
570 price_precision: u8,
571 size_precision: u8,
572 limit: Option<usize>,
573 records_processed: usize,
574}
575
576impl QuoteStreamIterator {
577 pub fn new<P: AsRef<Path>>(
583 filepath: P,
584 chunk_size: usize,
585 price_precision: Option<u8>,
586 size_precision: Option<u8>,
587 instrument_id: Option<InstrumentId>,
588 limit: Option<usize>,
589 ) -> anyhow::Result<Self> {
590 let (final_price_precision, final_size_precision) =
591 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
592 (price_prec, size_prec)
594 } else {
595 let mut reader = create_csv_reader(&filepath)?;
597 let mut record = StringRecord::new();
598 let (detected_price, detected_size) =
599 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
600 (
601 price_precision.unwrap_or(detected_price),
602 size_precision.unwrap_or(detected_size),
603 )
604 };
605
606 let reader = create_csv_reader(filepath)?;
607
608 Ok(Self {
609 reader,
610 record: StringRecord::new(),
611 buffer: Vec::with_capacity(chunk_size),
612 chunk_size,
613 instrument_id,
614 price_precision: final_price_precision,
615 size_precision: final_size_precision,
616 limit,
617 records_processed: 0,
618 })
619 }
620
621 fn detect_precision_from_sample(
622 reader: &mut Reader<Box<dyn std::io::Read>>,
623 record: &mut StringRecord,
624 sample_size: usize,
625 ) -> anyhow::Result<(u8, u8)> {
626 let mut max_price_precision = 2u8;
627 let mut max_size_precision = 0u8;
628 let mut records_scanned = 0;
629
630 while records_scanned < sample_size {
631 match reader.read_record(record) {
632 Ok(true) => {
633 if let Ok(data) = record.deserialize::<TardisQuoteRecord>(None) {
634 if let Some(bid_price_val) = data.bid_price {
635 max_price_precision =
636 max_price_precision.max(infer_precision(bid_price_val));
637 }
638 if let Some(ask_price_val) = data.ask_price {
639 max_price_precision =
640 max_price_precision.max(infer_precision(ask_price_val));
641 }
642 if let Some(bid_amount_val) = data.bid_amount {
643 max_size_precision =
644 max_size_precision.max(infer_precision(bid_amount_val));
645 }
646 if let Some(ask_amount_val) = data.ask_amount {
647 max_size_precision =
648 max_size_precision.max(infer_precision(ask_amount_val));
649 }
650 records_scanned += 1;
651 }
652 }
653 Ok(false) => break, Err(_) => records_scanned += 1, }
656 }
657
658 Ok((max_price_precision, max_size_precision))
659 }
660}
661
662impl Iterator for QuoteStreamIterator {
663 type Item = anyhow::Result<Vec<QuoteTick>>;
664
665 fn next(&mut self) -> Option<Self::Item> {
666 if let Some(limit) = self.limit
667 && self.records_processed >= limit
668 {
669 return None;
670 }
671
672 self.buffer.clear();
673 let mut records_read = 0;
674
675 while records_read < self.chunk_size {
676 match self.reader.read_record(&mut self.record) {
677 Ok(true) => match self.record.deserialize::<TardisQuoteRecord>(None) {
678 Ok(data) => {
679 let quote = parse_quote_record(
680 &data,
681 self.price_precision,
682 self.size_precision,
683 self.instrument_id,
684 );
685
686 self.buffer.push(quote);
687 records_read += 1;
688 self.records_processed += 1;
689
690 if let Some(limit) = self.limit
691 && self.records_processed >= limit
692 {
693 break;
694 }
695 }
696 Err(e) => {
697 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
698 }
699 },
700 Ok(false) => {
701 if self.buffer.is_empty() {
702 return None;
703 }
704 return Some(Ok(self.buffer.clone()));
705 }
706 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
707 }
708 }
709
710 if self.buffer.is_empty() {
711 None
712 } else {
713 Some(Ok(self.buffer.clone()))
714 }
715 }
716}
717
718pub fn stream_quotes<P: AsRef<Path>>(
733 filepath: P,
734 chunk_size: usize,
735 price_precision: Option<u8>,
736 size_precision: Option<u8>,
737 instrument_id: Option<InstrumentId>,
738 limit: Option<usize>,
739) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<QuoteTick>>>> {
740 QuoteStreamIterator::new(
741 filepath,
742 chunk_size,
743 price_precision,
744 size_precision,
745 instrument_id,
746 limit,
747 )
748}
749
750struct TradeStreamIterator {
756 reader: Reader<Box<dyn Read>>,
757 record: StringRecord,
758 buffer: Vec<TradeTick>,
759 chunk_size: usize,
760 instrument_id: Option<InstrumentId>,
761 price_precision: u8,
762 size_precision: u8,
763 limit: Option<usize>,
764 records_processed: usize,
765}
766
767impl TradeStreamIterator {
768 pub fn new<P: AsRef<Path>>(
774 filepath: P,
775 chunk_size: usize,
776 price_precision: Option<u8>,
777 size_precision: Option<u8>,
778 instrument_id: Option<InstrumentId>,
779 limit: Option<usize>,
780 ) -> anyhow::Result<Self> {
781 let (final_price_precision, final_size_precision) =
782 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
783 (price_prec, size_prec)
785 } else {
786 let mut reader = create_csv_reader(&filepath)?;
788 let mut record = StringRecord::new();
789 let (detected_price, detected_size) =
790 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
791 (
792 price_precision.unwrap_or(detected_price),
793 size_precision.unwrap_or(detected_size),
794 )
795 };
796
797 let reader = create_csv_reader(filepath)?;
798
799 Ok(Self {
800 reader,
801 record: StringRecord::new(),
802 buffer: Vec::with_capacity(chunk_size),
803 chunk_size,
804 instrument_id,
805 price_precision: final_price_precision,
806 size_precision: final_size_precision,
807 limit,
808 records_processed: 0,
809 })
810 }
811
812 fn detect_precision_from_sample(
813 reader: &mut Reader<Box<dyn std::io::Read>>,
814 record: &mut StringRecord,
815 sample_size: usize,
816 ) -> anyhow::Result<(u8, u8)> {
817 let mut max_price_precision = 2u8;
818 let mut max_size_precision = 0u8;
819 let mut records_scanned = 0;
820
821 while records_scanned < sample_size {
822 match reader.read_record(record) {
823 Ok(true) => {
824 if let Ok(data) = record.deserialize::<TardisTradeRecord>(None) {
825 max_price_precision = max_price_precision.max(infer_precision(data.price));
826 max_size_precision = max_size_precision.max(infer_precision(data.amount));
827 records_scanned += 1;
828 }
829 }
830 Ok(false) => break, Err(_) => records_scanned += 1, }
833 }
834
835 Ok((max_price_precision, max_size_precision))
836 }
837}
838
839impl Iterator for TradeStreamIterator {
840 type Item = anyhow::Result<Vec<TradeTick>>;
841
842 fn next(&mut self) -> Option<Self::Item> {
843 if let Some(limit) = self.limit
844 && self.records_processed >= limit
845 {
846 return None;
847 }
848
849 self.buffer.clear();
850 let mut records_read = 0;
851
852 while records_read < self.chunk_size {
853 match self.reader.read_record(&mut self.record) {
854 Ok(true) => match self.record.deserialize::<TardisTradeRecord>(None) {
855 Ok(data) => {
856 let size = Quantity::new(data.amount, self.size_precision);
857
858 if size.is_positive() {
859 let trade = parse_trade_record(
860 &data,
861 size,
862 self.price_precision,
863 self.instrument_id,
864 );
865
866 self.buffer.push(trade);
867 records_read += 1;
868 self.records_processed += 1;
869
870 if let Some(limit) = self.limit
871 && self.records_processed >= limit
872 {
873 break;
874 }
875 } else {
876 log::warn!("Skipping zero-sized trade: {data:?}");
877 }
878 }
879 Err(e) => {
880 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
881 }
882 },
883 Ok(false) => {
884 if self.buffer.is_empty() {
885 return None;
886 }
887 return Some(Ok(self.buffer.clone()));
888 }
889 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
890 }
891 }
892
893 if self.buffer.is_empty() {
894 None
895 } else {
896 Some(Ok(self.buffer.clone()))
897 }
898 }
899}
900
901pub fn stream_trades<P: AsRef<Path>>(
916 filepath: P,
917 chunk_size: usize,
918 price_precision: Option<u8>,
919 size_precision: Option<u8>,
920 instrument_id: Option<InstrumentId>,
921 limit: Option<usize>,
922) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<TradeTick>>>> {
923 TradeStreamIterator::new(
924 filepath,
925 chunk_size,
926 price_precision,
927 size_precision,
928 instrument_id,
929 limit,
930 )
931}
932
933struct Depth10StreamIterator {
939 reader: Reader<Box<dyn Read>>,
940 record: StringRecord,
941 buffer: Vec<OrderBookDepth10>,
942 chunk_size: usize,
943 levels: u8,
944 instrument_id: Option<InstrumentId>,
945 price_precision: u8,
946 size_precision: u8,
947 limit: Option<usize>,
948 records_processed: usize,
949}
950
951impl Depth10StreamIterator {
952 pub fn new<P: AsRef<Path>>(
958 filepath: P,
959 chunk_size: usize,
960 levels: u8,
961 price_precision: Option<u8>,
962 size_precision: Option<u8>,
963 instrument_id: Option<InstrumentId>,
964 limit: Option<usize>,
965 ) -> anyhow::Result<Self> {
966 anyhow::ensure!(
967 levels == 5 || levels == 25,
968 "Invalid levels: {levels}. Must be 5 or 25."
969 );
970
971 let (final_price_precision, final_size_precision) =
972 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
973 (price_prec, size_prec)
975 } else {
976 let mut reader = create_csv_reader(&filepath)?;
978 let mut record = StringRecord::new();
979 let (detected_price, detected_size) =
980 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
981 (
982 price_precision.unwrap_or(detected_price),
983 size_precision.unwrap_or(detected_size),
984 )
985 };
986
987 let reader = create_csv_reader(filepath)?;
988
989 Ok(Self {
990 reader,
991 record: StringRecord::new(),
992 buffer: Vec::with_capacity(chunk_size),
993 chunk_size,
994 levels,
995 instrument_id,
996 price_precision: final_price_precision,
997 size_precision: final_size_precision,
998 limit,
999 records_processed: 0,
1000 })
1001 }
1002
1003 fn process_snapshot5(&mut self, data: TardisOrderBookSnapshot5Record) -> OrderBookDepth10 {
1004 let instrument_id = self
1005 .instrument_id
1006 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1007
1008 let mut bids = [NULL_ORDER; DEPTH10_LEN];
1009 let mut asks = [NULL_ORDER; DEPTH10_LEN];
1010 let mut bid_counts = [0_u32; DEPTH10_LEN];
1011 let mut ask_counts = [0_u32; DEPTH10_LEN];
1012
1013 for i in 0..5 {
1015 let (bid_price, bid_amount) = match i {
1016 0 => (data.bids_0_price, data.bids_0_amount),
1017 1 => (data.bids_1_price, data.bids_1_amount),
1018 2 => (data.bids_2_price, data.bids_2_amount),
1019 3 => (data.bids_3_price, data.bids_3_amount),
1020 4 => (data.bids_4_price, data.bids_4_amount),
1021 _ => unreachable!(),
1022 };
1023
1024 let (ask_price, ask_amount) = match i {
1025 0 => (data.asks_0_price, data.asks_0_amount),
1026 1 => (data.asks_1_price, data.asks_1_amount),
1027 2 => (data.asks_2_price, data.asks_2_amount),
1028 3 => (data.asks_3_price, data.asks_3_amount),
1029 4 => (data.asks_4_price, data.asks_4_amount),
1030 _ => unreachable!(),
1031 };
1032
1033 let (bid_order, bid_count) = create_book_order(
1034 OrderSide::Buy,
1035 bid_price,
1036 bid_amount,
1037 self.price_precision,
1038 self.size_precision,
1039 );
1040 bids[i] = bid_order;
1041 bid_counts[i] = bid_count;
1042
1043 let (ask_order, ask_count) = create_book_order(
1044 OrderSide::Sell,
1045 ask_price,
1046 ask_amount,
1047 self.price_precision,
1048 self.size_precision,
1049 );
1050 asks[i] = ask_order;
1051 ask_counts[i] = ask_count;
1052 }
1053
1054 let flags = RecordFlag::F_SNAPSHOT.value();
1055 let sequence = 0;
1056 let ts_event = parse_timestamp(data.timestamp);
1057 let ts_init = parse_timestamp(data.local_timestamp);
1058
1059 OrderBookDepth10::new(
1060 instrument_id,
1061 bids,
1062 asks,
1063 bid_counts,
1064 ask_counts,
1065 flags,
1066 sequence,
1067 ts_event,
1068 ts_init,
1069 )
1070 }
1071
1072 fn process_snapshot25(&mut self, data: TardisOrderBookSnapshot25Record) -> OrderBookDepth10 {
1073 let instrument_id = self
1074 .instrument_id
1075 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1076
1077 let mut bids = [NULL_ORDER; DEPTH10_LEN];
1078 let mut asks = [NULL_ORDER; DEPTH10_LEN];
1079 let mut bid_counts = [0_u32; DEPTH10_LEN];
1080 let mut ask_counts = [0_u32; DEPTH10_LEN];
1081
1082 for i in 0..DEPTH10_LEN {
1084 let (bid_price, bid_amount) = match i {
1085 0 => (data.bids_0_price, data.bids_0_amount),
1086 1 => (data.bids_1_price, data.bids_1_amount),
1087 2 => (data.bids_2_price, data.bids_2_amount),
1088 3 => (data.bids_3_price, data.bids_3_amount),
1089 4 => (data.bids_4_price, data.bids_4_amount),
1090 5 => (data.bids_5_price, data.bids_5_amount),
1091 6 => (data.bids_6_price, data.bids_6_amount),
1092 7 => (data.bids_7_price, data.bids_7_amount),
1093 8 => (data.bids_8_price, data.bids_8_amount),
1094 9 => (data.bids_9_price, data.bids_9_amount),
1095 _ => unreachable!(),
1096 };
1097
1098 let (ask_price, ask_amount) = match i {
1099 0 => (data.asks_0_price, data.asks_0_amount),
1100 1 => (data.asks_1_price, data.asks_1_amount),
1101 2 => (data.asks_2_price, data.asks_2_amount),
1102 3 => (data.asks_3_price, data.asks_3_amount),
1103 4 => (data.asks_4_price, data.asks_4_amount),
1104 5 => (data.asks_5_price, data.asks_5_amount),
1105 6 => (data.asks_6_price, data.asks_6_amount),
1106 7 => (data.asks_7_price, data.asks_7_amount),
1107 8 => (data.asks_8_price, data.asks_8_amount),
1108 9 => (data.asks_9_price, data.asks_9_amount),
1109 _ => unreachable!(),
1110 };
1111
1112 let (bid_order, bid_count) = create_book_order(
1113 OrderSide::Buy,
1114 bid_price,
1115 bid_amount,
1116 self.price_precision,
1117 self.size_precision,
1118 );
1119 bids[i] = bid_order;
1120 bid_counts[i] = bid_count;
1121
1122 let (ask_order, ask_count) = create_book_order(
1123 OrderSide::Sell,
1124 ask_price,
1125 ask_amount,
1126 self.price_precision,
1127 self.size_precision,
1128 );
1129 asks[i] = ask_order;
1130 ask_counts[i] = ask_count;
1131 }
1132
1133 let flags = RecordFlag::F_SNAPSHOT.value();
1134 let sequence = 0;
1135 let ts_event = parse_timestamp(data.timestamp);
1136 let ts_init = parse_timestamp(data.local_timestamp);
1137
1138 OrderBookDepth10::new(
1139 instrument_id,
1140 bids,
1141 asks,
1142 bid_counts,
1143 ask_counts,
1144 flags,
1145 sequence,
1146 ts_event,
1147 ts_init,
1148 )
1149 }
1150
1151 fn detect_precision_from_sample(
1152 reader: &mut Reader<Box<dyn std::io::Read>>,
1153 record: &mut StringRecord,
1154 sample_size: usize,
1155 ) -> anyhow::Result<(u8, u8)> {
1156 let mut max_price_precision = 2u8;
1157 let mut max_size_precision = 0u8;
1158 let mut records_scanned = 0;
1159
1160 while records_scanned < sample_size {
1161 match reader.read_record(record) {
1162 Ok(true) => {
1163 if let Ok(data) = record.deserialize::<TardisOrderBookSnapshot5Record>(None) {
1165 if let Some(bid_price) = data.bids_0_price {
1166 max_price_precision =
1167 max_price_precision.max(infer_precision(bid_price));
1168 }
1169 if let Some(ask_price) = data.asks_0_price {
1170 max_price_precision =
1171 max_price_precision.max(infer_precision(ask_price));
1172 }
1173 if let Some(bid_amount) = data.bids_0_amount {
1174 max_size_precision =
1175 max_size_precision.max(infer_precision(bid_amount));
1176 }
1177 if let Some(ask_amount) = data.asks_0_amount {
1178 max_size_precision =
1179 max_size_precision.max(infer_precision(ask_amount));
1180 }
1181 records_scanned += 1;
1182 } else if let Ok(data) =
1183 record.deserialize::<TardisOrderBookSnapshot25Record>(None)
1184 {
1185 if let Some(bid_price) = data.bids_0_price {
1186 max_price_precision =
1187 max_price_precision.max(infer_precision(bid_price));
1188 }
1189 if let Some(ask_price) = data.asks_0_price {
1190 max_price_precision =
1191 max_price_precision.max(infer_precision(ask_price));
1192 }
1193 if let Some(bid_amount) = data.bids_0_amount {
1194 max_size_precision =
1195 max_size_precision.max(infer_precision(bid_amount));
1196 }
1197 if let Some(ask_amount) = data.asks_0_amount {
1198 max_size_precision =
1199 max_size_precision.max(infer_precision(ask_amount));
1200 }
1201 records_scanned += 1;
1202 }
1203 }
1204 Ok(false) => break, Err(_) => records_scanned += 1, }
1207 }
1208
1209 Ok((max_price_precision, max_size_precision))
1210 }
1211}
1212
1213impl Iterator for Depth10StreamIterator {
1214 type Item = anyhow::Result<Vec<OrderBookDepth10>>;
1215
1216 fn next(&mut self) -> Option<Self::Item> {
1217 if let Some(limit) = self.limit
1218 && self.records_processed >= limit
1219 {
1220 return None;
1221 }
1222
1223 if !self.buffer.is_empty() {
1224 let chunk = self.buffer.split_off(0);
1225 return Some(Ok(chunk));
1226 }
1227
1228 self.buffer.clear();
1229 let mut records_read = 0;
1230
1231 while records_read < self.chunk_size {
1232 match self.reader.read_record(&mut self.record) {
1233 Ok(true) => {
1234 let result = match self.levels {
1235 5 => self
1236 .record
1237 .deserialize::<TardisOrderBookSnapshot5Record>(None)
1238 .map(|data| self.process_snapshot5(data)),
1239 25 => self
1240 .record
1241 .deserialize::<TardisOrderBookSnapshot25Record>(None)
1242 .map(|data| self.process_snapshot25(data)),
1243 _ => return Some(Err(anyhow::anyhow!("Invalid levels: {}", self.levels))),
1244 };
1245
1246 match result {
1247 Ok(depth) => {
1248 self.buffer.push(depth);
1249 records_read += 1;
1250 self.records_processed += 1;
1251
1252 if let Some(limit) = self.limit
1253 && self.records_processed >= limit
1254 {
1255 break;
1256 }
1257 }
1258 Err(e) => {
1259 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
1260 }
1261 }
1262 }
1263 Ok(false) => {
1264 if self.buffer.is_empty() {
1265 return None;
1266 }
1267 let chunk = self.buffer.split_off(0);
1268 return Some(Ok(chunk));
1269 }
1270 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1271 }
1272 }
1273
1274 if self.buffer.is_empty() {
1275 None
1276 } else {
1277 let chunk = self.buffer.split_off(0);
1278 Some(Ok(chunk))
1279 }
1280 }
1281}
1282
1283pub fn stream_depth10_from_snapshot5<P: AsRef<Path>>(
1298 filepath: P,
1299 chunk_size: usize,
1300 price_precision: Option<u8>,
1301 size_precision: Option<u8>,
1302 instrument_id: Option<InstrumentId>,
1303 limit: Option<usize>,
1304) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1305 Depth10StreamIterator::new(
1306 filepath,
1307 chunk_size,
1308 5,
1309 price_precision,
1310 size_precision,
1311 instrument_id,
1312 limit,
1313 )
1314}
1315
1316pub fn stream_depth10_from_snapshot25<P: AsRef<Path>>(
1331 filepath: P,
1332 chunk_size: usize,
1333 price_precision: Option<u8>,
1334 size_precision: Option<u8>,
1335 instrument_id: Option<InstrumentId>,
1336 limit: Option<usize>,
1337) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1338 Depth10StreamIterator::new(
1339 filepath,
1340 chunk_size,
1341 25,
1342 price_precision,
1343 size_precision,
1344 instrument_id,
1345 limit,
1346 )
1347}
1348
1349use nautilus_model::data::FundingRateUpdate;
1354
1355use crate::csv::record::TardisDerivativeTickerRecord;
1356
1357struct FundingRateStreamIterator {
1359 reader: Reader<Box<dyn Read>>,
1360 record: StringRecord,
1361 buffer: Vec<FundingRateUpdate>,
1362 chunk_size: usize,
1363 instrument_id: Option<InstrumentId>,
1364 limit: Option<usize>,
1365 records_processed: usize,
1366}
1367
1368impl FundingRateStreamIterator {
1369 fn new<P: AsRef<Path>>(
1375 filepath: P,
1376 chunk_size: usize,
1377 instrument_id: Option<InstrumentId>,
1378 limit: Option<usize>,
1379 ) -> anyhow::Result<Self> {
1380 let reader = create_csv_reader(filepath)?;
1381
1382 Ok(Self {
1383 reader,
1384 record: StringRecord::new(),
1385 buffer: Vec::with_capacity(chunk_size),
1386 chunk_size,
1387 instrument_id,
1388 limit,
1389 records_processed: 0,
1390 })
1391 }
1392}
1393
1394impl Iterator for FundingRateStreamIterator {
1395 type Item = anyhow::Result<Vec<FundingRateUpdate>>;
1396
1397 fn next(&mut self) -> Option<Self::Item> {
1398 if let Some(limit) = self.limit
1399 && self.records_processed >= limit
1400 {
1401 return None;
1402 }
1403
1404 if !self.buffer.is_empty() {
1405 let chunk = self.buffer.split_off(0);
1406 return Some(Ok(chunk));
1407 }
1408
1409 self.buffer.clear();
1410 let mut records_read = 0;
1411
1412 while records_read < self.chunk_size {
1413 match self.reader.read_record(&mut self.record) {
1414 Ok(true) => {
1415 let result = self
1416 .record
1417 .deserialize::<TardisDerivativeTickerRecord>(None)
1418 .map_err(anyhow::Error::from)
1419 .map(|data| parse_derivative_ticker_record(&data, self.instrument_id));
1420
1421 match result {
1422 Ok(Some(funding_rate)) => {
1423 self.buffer.push(funding_rate);
1424 records_read += 1;
1425 self.records_processed += 1;
1426
1427 if let Some(limit) = self.limit
1428 && self.records_processed >= limit
1429 {
1430 break;
1431 }
1432 }
1433 Ok(None) => {
1434 self.records_processed += 1;
1436 }
1437 Err(e) => {
1438 return Some(Err(anyhow::anyhow!(
1439 "Failed to parse funding rate record: {e}"
1440 )));
1441 }
1442 }
1443 }
1444 Ok(false) => {
1445 if self.buffer.is_empty() {
1446 return None;
1447 }
1448 let chunk = self.buffer.split_off(0);
1449 return Some(Ok(chunk));
1450 }
1451 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1452 }
1453 }
1454
1455 if self.buffer.is_empty() {
1456 None
1457 } else {
1458 let chunk = self.buffer.split_off(0);
1459 Some(Ok(chunk))
1460 }
1461 }
1462}
1463
1464pub fn stream_funding_rates<P: AsRef<Path>>(
1474 filepath: P,
1475 chunk_size: usize,
1476 instrument_id: Option<InstrumentId>,
1477 limit: Option<usize>,
1478) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<FundingRateUpdate>>>> {
1479 FundingRateStreamIterator::new(filepath, chunk_size, instrument_id, limit)
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484 use nautilus_model::{
1485 enums::{AggressorSide, BookAction},
1486 identifiers::TradeId,
1487 types::Price,
1488 };
1489 use rstest::*;
1490
1491 use super::*;
1492 use crate::{common::testing::get_test_data_path, csv::load::load_deltas, parse::parse_price};
1493
1494 #[rstest]
1495 #[case(0.0, 0)]
1496 #[case(42.0, 0)]
1497 #[case(0.1, 1)]
1498 #[case(0.25, 2)]
1499 #[case(123.0001, 4)]
1500 #[case(-42.987654321, 9)]
1501 #[case(1.234_567_890_123, 12)]
1502 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
1503 assert_eq!(infer_precision(input), expected);
1504 }
1505
1506 #[rstest]
1507 pub fn test_stream_deltas_chunked() {
1508 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1509binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1510binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1511binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1512binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1513binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1514
1515 let temp_file = std::env::temp_dir().join("test_stream_deltas.csv");
1516 std::fs::write(&temp_file, csv_data).unwrap();
1517
1518 let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1519 let chunks: Vec<_> = stream.collect();
1520
1521 assert_eq!(chunks.len(), 3);
1523
1524 let chunk1 = chunks[0].as_ref().unwrap();
1525 assert_eq!(chunk1.len(), 2);
1526 assert_eq!(chunk1[0].action, BookAction::Clear); assert_eq!(chunk1[1].order.price.precision, 4); let chunk2 = chunks[1].as_ref().unwrap();
1530 assert_eq!(chunk2.len(), 2);
1531 assert_eq!(chunk2[0].order.price.precision, 4);
1532 assert_eq!(chunk2[1].order.price.precision, 4);
1533
1534 let chunk3 = chunks[2].as_ref().unwrap();
1535 assert_eq!(chunk3.len(), 2);
1536 assert_eq!(chunk3[0].order.price.precision, 4);
1537 assert_eq!(chunk3[1].order.price.precision, 4);
1538
1539 let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1540 assert_eq!(total_deltas, 6);
1541
1542 std::fs::remove_file(&temp_file).ok();
1543 }
1544
1545 #[cfg(feature = "python")]
1546 #[rstest]
1547 pub fn test_stream_batched_deltas_clear_and_limit() {
1548 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1549binance,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1550binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1551binance,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1552binance,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1553binance,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1554
1555 let temp_file = std::env::temp_dir().join("test_stream_batched_deltas.csv");
1556 std::fs::write(&temp_file, csv_data).unwrap();
1557
1558 let mut iterator =
1560 BatchedDeltasStreamIterator::new(&temp_file, 10, Some(4), Some(1), None, Some(1))
1561 .unwrap();
1562 iterator.fill_pending_batches().transpose().unwrap();
1563 assert_eq!(iterator.pending_batches.len(), 1);
1564 assert_eq!(iterator.pending_batches[0].len(), 1);
1565 assert_eq!(iterator.pending_batches[0][0].action, BookAction::Clear);
1566
1567 let mut iterator =
1569 BatchedDeltasStreamIterator::new(&temp_file, 10, Some(4), Some(1), None, None).unwrap();
1570 iterator.fill_pending_batches().transpose().unwrap();
1571 assert_eq!(iterator.pending_batches.len(), 5);
1572 assert_eq!(iterator.pending_batches[0].len(), 2);
1573 assert_eq!(iterator.pending_batches[0][0].action, BookAction::Clear);
1574 assert_ne!(iterator.pending_batches[0][1].action, BookAction::Clear);
1575 let total_deltas: usize = iterator
1576 .pending_batches
1577 .iter()
1578 .map(|batch| batch.len())
1579 .sum();
1580 assert_eq!(total_deltas, 6);
1581
1582 std::fs::remove_file(&temp_file).ok();
1583 }
1584
1585 #[cfg(feature = "python")]
1586 #[rstest]
1587 pub fn test_stream_batched_deltas_with_mid_snapshot_inserts_clear() {
1588 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1594binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1595binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1596binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1597binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1598binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
1599binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
1600binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
1601
1602 let temp_file = std::env::temp_dir().join("test_stream_batched_mid_snapshot.csv");
1603 std::fs::write(&temp_file, csv_data).unwrap();
1604
1605 let mut iterator =
1606 BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, None)
1607 .unwrap();
1608 iterator.fill_pending_batches().transpose().unwrap();
1609
1610 let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1611 let clear_count = all_deltas
1612 .iter()
1613 .filter(|d| d.action == BookAction::Clear)
1614 .count();
1615
1616 assert_eq!(
1618 clear_count, 2,
1619 "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
1620 );
1621
1622 assert_eq!(all_deltas[0].action, BookAction::Clear);
1625 assert_eq!(all_deltas[5].action, BookAction::Clear);
1626
1627 assert_eq!(
1629 all_deltas[0].flags & RecordFlag::F_LAST.value(),
1630 0,
1631 "CLEAR at index 0 should not have F_LAST flag"
1632 );
1633 assert_eq!(
1634 all_deltas[5].flags & RecordFlag::F_LAST.value(),
1635 0,
1636 "CLEAR at index 5 should not have F_LAST flag"
1637 );
1638
1639 std::fs::remove_file(&temp_file).ok();
1640 }
1641
1642 #[cfg(feature = "python")]
1643 #[rstest]
1644 pub fn test_stream_batched_deltas_limit_includes_clear() {
1645 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1647binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1648binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1649binance,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1650binance,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1651binance,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,1.0";
1652
1653 let temp_file = std::env::temp_dir().join("test_stream_batched_limit_includes_clear.csv");
1654 std::fs::write(&temp_file, csv_data).unwrap();
1655
1656 let mut iterator =
1657 BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, Some(4))
1658 .unwrap();
1659 iterator.fill_pending_batches().transpose().unwrap();
1660
1661 let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1662
1663 assert_eq!(all_deltas.len(), 4);
1665 assert_eq!(all_deltas[0].action, BookAction::Clear);
1666 assert_eq!(all_deltas[1].action, BookAction::Add);
1667 assert_eq!(all_deltas[2].action, BookAction::Update);
1668 assert_eq!(all_deltas[3].action, BookAction::Update);
1669
1670 std::fs::remove_file(&temp_file).ok();
1671 }
1672
1673 #[cfg(feature = "python")]
1674 #[rstest]
1675 pub fn test_stream_batched_deltas_limit_sets_f_last() {
1676 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1678binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1679binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
1680binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,0.5
1681binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,1.5";
1682
1683 let temp_file = std::env::temp_dir().join("test_stream_batched_limit_f_last.csv");
1684 std::fs::write(&temp_file, csv_data).unwrap();
1685
1686 let mut iterator =
1688 BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, Some(3))
1689 .unwrap();
1690 iterator.fill_pending_batches().transpose().unwrap();
1691
1692 let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1693
1694 assert_eq!(all_deltas.len(), 3);
1695 assert_eq!(
1696 all_deltas[2].flags & RecordFlag::F_LAST.value(),
1697 RecordFlag::F_LAST.value(),
1698 "Final delta should have F_LAST flag when limit is reached"
1699 );
1700
1701 std::fs::remove_file(&temp_file).ok();
1702 }
1703
1704 #[cfg(feature = "python")]
1705 #[rstest]
1706 pub fn test_stream_batched_deltas_snapshot_batch_flags() {
1707 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1709binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1710binance,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1711binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5";
1712
1713 let temp_file = std::env::temp_dir().join("test_stream_batched_snapshot_batch_flags.csv");
1714 std::fs::write(&temp_file, csv_data).unwrap();
1715
1716 let mut iterator =
1717 BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, None)
1718 .unwrap();
1719 iterator.fill_pending_batches().transpose().unwrap();
1720
1721 assert_eq!(iterator.pending_batches.len(), 2);
1722 let first_batch = &iterator.pending_batches[0];
1723
1724 assert_eq!(first_batch.len(), 3);
1726 assert_eq!(first_batch[0].action, BookAction::Clear);
1727 assert_eq!(first_batch[0].flags & RecordFlag::F_LAST.value(), 0);
1728 assert_eq!(first_batch[1].flags & RecordFlag::F_LAST.value(), 0);
1729 assert_eq!(
1730 first_batch[2].flags & RecordFlag::F_LAST.value(),
1731 RecordFlag::F_LAST.value()
1732 );
1733
1734 assert_eq!(iterator.pending_batches[1].len(), 1);
1736 assert_eq!(
1737 iterator.pending_batches[1][0].flags & RecordFlag::F_LAST.value(),
1738 RecordFlag::F_LAST.value()
1739 );
1740
1741 std::fs::remove_file(&temp_file).ok();
1742 }
1743
1744 #[rstest]
1745 pub fn test_stream_quotes_chunked() {
1746 let csv_data =
1747 "exchange,symbol,timestamp,local_timestamp,ask_amount,ask_price,bid_price,bid_amount
1748binance,BTCUSDT,1640995200000000,1640995200100000,1.0,50000.0,49999.0,1.5
1749binance,BTCUSDT,1640995201000000,1640995201100000,2.0,50000.5,49999.5,2.5
1750binance,BTCUSDT,1640995202000000,1640995202100000,1.5,50000.12,49999.12,1.8
1751binance,BTCUSDT,1640995203000000,1640995203100000,3.0,50000.123,49999.123,3.2
1752binance,BTCUSDT,1640995204000000,1640995204100000,0.5,50000.1234,49999.1234,0.8";
1753
1754 let temp_file = std::env::temp_dir().join("test_stream_quotes.csv");
1755 std::fs::write(&temp_file, csv_data).unwrap();
1756
1757 let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1758 let chunks: Vec<_> = stream.collect();
1759
1760 assert_eq!(chunks.len(), 3);
1761
1762 let chunk1 = chunks[0].as_ref().unwrap();
1763 assert_eq!(chunk1.len(), 2);
1764 assert_eq!(chunk1[0].bid_price.precision, 4);
1765 assert_eq!(chunk1[1].bid_price.precision, 4);
1766
1767 let chunk2 = chunks[1].as_ref().unwrap();
1768 assert_eq!(chunk2.len(), 2);
1769 assert_eq!(chunk2[0].bid_price.precision, 4);
1770 assert_eq!(chunk2[1].bid_price.precision, 4);
1771
1772 let chunk3 = chunks[2].as_ref().unwrap();
1773 assert_eq!(chunk3.len(), 1);
1774 assert_eq!(chunk3[0].bid_price.precision, 4);
1775
1776 let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1777 assert_eq!(total_quotes, 5);
1778
1779 std::fs::remove_file(&temp_file).ok();
1780 }
1781
1782 #[rstest]
1783 pub fn test_stream_trades_chunked() {
1784 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1785binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1786binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1787binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1788binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1789binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1790
1791 let temp_file = std::env::temp_dir().join("test_stream_trades.csv");
1792 std::fs::write(&temp_file, csv_data).unwrap();
1793
1794 let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1795 let chunks: Vec<_> = stream.collect();
1796
1797 assert_eq!(chunks.len(), 2);
1798
1799 let chunk1 = chunks[0].as_ref().unwrap();
1800 assert_eq!(chunk1.len(), 3);
1801 assert_eq!(chunk1[0].price.precision, 4);
1802 assert_eq!(chunk1[1].price.precision, 4);
1803 assert_eq!(chunk1[2].price.precision, 4);
1804
1805 let chunk2 = chunks[1].as_ref().unwrap();
1806 assert_eq!(chunk2.len(), 2);
1807 assert_eq!(chunk2[0].price.precision, 4);
1808 assert_eq!(chunk2[1].price.precision, 4);
1809
1810 assert_eq!(chunk1[0].aggressor_side, AggressorSide::Buyer);
1811 assert_eq!(chunk1[1].aggressor_side, AggressorSide::Seller);
1812
1813 let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1814 assert_eq!(total_trades, 5);
1815
1816 std::fs::remove_file(&temp_file).ok();
1817 }
1818
1819 #[rstest]
1820 pub fn test_stream_trades_with_zero_sized_trade() {
1821 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1823binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1824binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1825binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1826binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1827
1828 let temp_file = std::env::temp_dir().join("test_stream_trades_zero_size.csv");
1829 std::fs::write(&temp_file, csv_data).unwrap();
1830
1831 let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1832 let chunks: Vec<_> = stream.collect();
1833
1834 assert_eq!(chunks.len(), 1);
1836
1837 let chunk1 = chunks[0].as_ref().unwrap();
1838 assert_eq!(chunk1.len(), 3);
1839
1840 assert_eq!(chunk1[0].size, Quantity::from("1.0"));
1842 assert_eq!(chunk1[1].size, Quantity::from("1.5"));
1843 assert_eq!(chunk1[2].size, Quantity::from("3.0"));
1844
1845 assert_eq!(chunk1[0].trade_id, TradeId::new("trade1"));
1847 assert_eq!(chunk1[1].trade_id, TradeId::new("trade3"));
1848 assert_eq!(chunk1[2].trade_id, TradeId::new("trade4"));
1849
1850 std::fs::remove_file(&temp_file).ok();
1851 }
1852
1853 #[rstest]
1854 pub fn test_stream_depth10_from_snapshot5_chunked() {
1855 let csv_data = "exchange,symbol,timestamp,local_timestamp,asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1856binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,49999.0,1.5,50002.0,2.0,49998.0,2.5,50003.0,3.0,49997.0,3.5,50004.0,4.0,49996.0,4.5,50005.0,5.0,49995.0,5.5
1857binance,BTCUSDT,1640995201000000,1640995201100000,50001.5,1.1,49999.5,1.6,50002.5,2.1,49998.5,2.6,50003.5,3.1,49997.5,3.6,50004.5,4.1,49996.5,4.6,50005.5,5.1,49995.5,5.6
1858binance,BTCUSDT,1640995202000000,1640995202100000,50001.12,1.12,49999.12,1.62,50002.12,2.12,49998.12,2.62,50003.12,3.12,49997.12,3.62,50004.12,4.12,49996.12,4.62,50005.12,5.12,49995.12,5.62";
1859
1860 let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot5.csv");
1862 std::fs::write(&temp_file, csv_data).unwrap();
1863
1864 let stream = stream_depth10_from_snapshot5(&temp_file, 2, None, None, None, None).unwrap();
1866 let chunks: Vec<_> = stream.collect();
1867
1868 assert_eq!(chunks.len(), 2);
1870
1871 let chunk1 = chunks[0].as_ref().unwrap();
1873 assert_eq!(chunk1.len(), 2);
1874
1875 let chunk2 = chunks[1].as_ref().unwrap();
1877 assert_eq!(chunk2.len(), 1);
1878
1879 let first_depth = &chunk1[0];
1881 assert_eq!(first_depth.bids.len(), 10); assert_eq!(first_depth.asks.len(), 10);
1883
1884 assert_eq!(first_depth.bids[0].price, parse_price(49999.0, 1));
1886 assert_eq!(first_depth.asks[0].price, parse_price(50001.0, 1));
1887
1888 let total_depths: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1890 assert_eq!(total_depths, 3);
1891
1892 std::fs::remove_file(&temp_file).ok();
1894 }
1895
1896 #[rstest]
1897 pub fn test_stream_depth10_from_snapshot25_chunked() {
1898 let mut header_parts = vec!["exchange", "symbol", "timestamp", "local_timestamp"];
1900
1901 let mut bid_headers = Vec::new();
1903 let mut ask_headers = Vec::new();
1904 for i in 0..25 {
1905 bid_headers.push(format!("bids[{i}].price"));
1906 bid_headers.push(format!("bids[{i}].amount"));
1907 }
1908 for i in 0..25 {
1909 ask_headers.push(format!("asks[{i}].price"));
1910 ask_headers.push(format!("asks[{i}].amount"));
1911 }
1912
1913 for header in &bid_headers {
1914 header_parts.push(header);
1915 }
1916 for header in &ask_headers {
1917 header_parts.push(header);
1918 }
1919
1920 let header = header_parts.join(",");
1921
1922 let mut row1_parts = vec![
1924 "binance".to_string(),
1925 "BTCUSDT".to_string(),
1926 "1640995200000000".to_string(),
1927 "1640995200100000".to_string(),
1928 ];
1929
1930 for i in 0..25 {
1932 if i < 5 {
1933 let bid_price = f64::from(i).mul_add(-0.01, 49999.0);
1934 let bid_amount = 1.0 + f64::from(i);
1935 row1_parts.push(bid_price.to_string());
1936 row1_parts.push(bid_amount.to_string());
1937 } else {
1938 row1_parts.push(String::new());
1939 row1_parts.push(String::new());
1940 }
1941 }
1942
1943 for i in 0..25 {
1945 if i < 5 {
1946 let ask_price = f64::from(i).mul_add(0.01, 50000.0);
1947 let ask_amount = 1.0 + f64::from(i);
1948 row1_parts.push(ask_price.to_string());
1949 row1_parts.push(ask_amount.to_string());
1950 } else {
1951 row1_parts.push(String::new());
1952 row1_parts.push(String::new());
1953 }
1954 }
1955
1956 let csv_data = format!("{}\n{}", header, row1_parts.join(","));
1957
1958 let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot25.csv");
1960 std::fs::write(&temp_file, &csv_data).unwrap();
1961
1962 let stream = stream_depth10_from_snapshot25(&temp_file, 1, None, None, None, None).unwrap();
1964 let chunks: Vec<_> = stream.collect();
1965
1966 assert_eq!(chunks.len(), 1);
1968
1969 let chunk1 = chunks[0].as_ref().unwrap();
1970 assert_eq!(chunk1.len(), 1);
1971
1972 let depth = &chunk1[0];
1974 assert_eq!(depth.bids.len(), 10); assert_eq!(depth.asks.len(), 10);
1976
1977 let actual_bid_price = depth.bids[0].price;
1979 let actual_ask_price = depth.asks[0].price;
1980 assert!(actual_bid_price.as_f64() > 0.0);
1981 assert!(actual_ask_price.as_f64() > 0.0);
1982
1983 std::fs::remove_file(&temp_file).ok();
1985 }
1986
1987 #[rstest]
1988 pub fn test_stream_error_handling() {
1989 let non_existent = std::path::Path::new("does_not_exist.csv");
1991
1992 let result = stream_deltas(non_existent, 10, None, None, None, None);
1993 assert!(result.is_err());
1994
1995 let result = stream_quotes(non_existent, 10, None, None, None, None);
1996 assert!(result.is_err());
1997
1998 let result = stream_trades(non_existent, 10, None, None, None, None);
1999 assert!(result.is_err());
2000
2001 let result = stream_depth10_from_snapshot5(non_existent, 10, None, None, None, None);
2002 assert!(result.is_err());
2003
2004 let result = stream_depth10_from_snapshot25(non_existent, 10, None, None, None, None);
2005 assert!(result.is_err());
2006 }
2007
2008 #[rstest]
2009 pub fn test_stream_empty_file() {
2010 let temp_file = std::env::temp_dir().join("test_empty.csv");
2012 std::fs::write(&temp_file, "").unwrap();
2013
2014 let stream = stream_deltas(&temp_file, 10, None, None, None, None).unwrap();
2015 assert_eq!(stream.count(), 0);
2016
2017 std::fs::remove_file(&temp_file).ok();
2019 }
2020
2021 #[rstest]
2022 pub fn test_stream_precision_consistency() {
2023 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2025binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
2026binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
2027binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
2028binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0";
2029
2030 let temp_file = std::env::temp_dir().join("test_precision_consistency.csv");
2031 std::fs::write(&temp_file, csv_data).unwrap();
2032
2033 let bulk_deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
2035
2036 let stream = stream_deltas(&temp_file, 2, None, None, None, None).unwrap();
2038 let streamed_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2039
2040 assert_eq!(bulk_deltas.len(), streamed_deltas.len());
2042
2043 for (bulk, streamed) in bulk_deltas.iter().zip(streamed_deltas.iter()) {
2045 assert_eq!(bulk.instrument_id, streamed.instrument_id);
2046 assert_eq!(bulk.action, streamed.action);
2047 assert_eq!(bulk.order.side, streamed.order.side);
2048 assert_eq!(bulk.ts_event, streamed.ts_event);
2049 assert_eq!(bulk.ts_init, streamed.ts_init);
2050 }
2052
2053 std::fs::remove_file(&temp_file).ok();
2055 }
2056
2057 #[rstest]
2058 pub fn test_stream_trades_from_local_file() {
2059 let filepath = get_test_data_path("csv/trades_1.csv");
2060 let mut stream = stream_trades(filepath, 1, Some(1), Some(0), None, None).unwrap();
2061
2062 let chunk1 = stream.next().unwrap().unwrap();
2063 assert_eq!(chunk1.len(), 1);
2064 assert_eq!(chunk1[0].price, Price::from("8531.5"));
2065
2066 let chunk2 = stream.next().unwrap().unwrap();
2067 assert_eq!(chunk2.len(), 1);
2068 assert_eq!(chunk2[0].size, Quantity::from("1000"));
2069
2070 assert!(stream.next().is_none());
2071 }
2072
2073 #[rstest]
2074 pub fn test_stream_deltas_from_local_file() {
2075 let filepath = get_test_data_path("csv/deltas_1.csv");
2076 let mut stream = stream_deltas(filepath, 1, Some(1), Some(0), None, None).unwrap();
2077
2078 let chunk1 = stream.next().unwrap().unwrap();
2081 assert_eq!(chunk1.len(), 1);
2082 assert_eq!(chunk1[0].action, BookAction::Clear);
2083
2084 let chunk2 = stream.next().unwrap().unwrap();
2086 assert_eq!(chunk2.len(), 1);
2087 assert_eq!(chunk2[0].order.price, Price::from("6421.5"));
2088
2089 let chunk3 = stream.next().unwrap().unwrap();
2091 assert_eq!(chunk3.len(), 1);
2092 assert_eq!(chunk3[0].order.size, Quantity::from("10000"));
2093
2094 assert!(stream.next().is_none());
2095 }
2096
2097 #[rstest]
2098 pub fn test_stream_deltas_with_limit() {
2099 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2100binance,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2101binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
2102binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,1.5
2103binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,3.0
2104binance,BTCUSDT,1640995204000000,1640995204100000,false,bid,49998.0,0.5";
2105
2106 let temp_file = std::env::temp_dir().join("test_stream_deltas_limit.csv");
2107 std::fs::write(&temp_file, csv_data).unwrap();
2108
2109 let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
2111 let chunks: Vec<_> = stream.collect();
2112
2113 assert_eq!(chunks.len(), 2);
2115 let chunk1 = chunks[0].as_ref().unwrap();
2116 assert_eq!(chunk1.len(), 2);
2117 let chunk2 = chunks[1].as_ref().unwrap();
2118 assert_eq!(chunk2.len(), 1);
2119
2120 let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2122 assert_eq!(total_deltas, 3);
2123
2124 std::fs::remove_file(&temp_file).ok();
2125 }
2126
2127 #[rstest]
2128 pub fn test_stream_quotes_with_limit() {
2129 let csv_data =
2130 "exchange,symbol,timestamp,local_timestamp,ask_price,ask_amount,bid_price,bid_amount
2131binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,50000.0,1.5
2132binance,BTCUSDT,1640995201000000,1640995201100000,50002.0,2.0,49999.0,2.5
2133binance,BTCUSDT,1640995202000000,1640995202100000,50003.0,1.5,49998.0,3.0
2134binance,BTCUSDT,1640995203000000,1640995203100000,50004.0,3.0,49997.0,3.5";
2135
2136 let temp_file = std::env::temp_dir().join("test_stream_quotes_limit.csv");
2137 std::fs::write(&temp_file, csv_data).unwrap();
2138
2139 let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, Some(2)).unwrap();
2141 let chunks: Vec<_> = stream.collect();
2142
2143 assert_eq!(chunks.len(), 1);
2145 let chunk1 = chunks[0].as_ref().unwrap();
2146 assert_eq!(chunk1.len(), 2);
2147
2148 let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2150 assert_eq!(total_quotes, 2);
2151
2152 std::fs::remove_file(&temp_file).ok();
2153 }
2154
2155 #[rstest]
2156 pub fn test_stream_trades_with_limit() {
2157 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
2158binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
2159binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
2160binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
2161binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
2162binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
2163
2164 let temp_file = std::env::temp_dir().join("test_stream_trades_limit.csv");
2165 std::fs::write(&temp_file, csv_data).unwrap();
2166
2167 let stream = stream_trades(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
2169 let chunks: Vec<_> = stream.collect();
2170
2171 assert_eq!(chunks.len(), 2);
2173 let chunk1 = chunks[0].as_ref().unwrap();
2174 assert_eq!(chunk1.len(), 2);
2175 let chunk2 = chunks[1].as_ref().unwrap();
2176 assert_eq!(chunk2.len(), 1);
2177
2178 let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2180 assert_eq!(total_trades, 3);
2181
2182 std::fs::remove_file(&temp_file).ok();
2183 }
2184
2185 #[rstest]
2186 pub fn test_depth10_invalid_levels_error_at_construction() {
2187 let temp_file = std::env::temp_dir().join("test_depth10_invalid_levels.csv");
2188 std::fs::write(&temp_file, "exchange,symbol,timestamp,local_timestamp\n").unwrap();
2189
2190 let result = Depth10StreamIterator::new(&temp_file, 10, 10, None, None, None, None);
2191 assert!(result.is_err());
2192 let err_msg = result.err().unwrap().to_string();
2193 assert!(
2194 err_msg.contains("Invalid levels"),
2195 "Error should mention 'Invalid levels': {err_msg}"
2196 );
2197
2198 let result = Depth10StreamIterator::new(&temp_file, 10, 3, None, None, None, None);
2199 assert!(result.is_err());
2200
2201 let result = Depth10StreamIterator::new(&temp_file, 10, 5, None, None, None, None);
2202 assert!(result.is_ok());
2203
2204 let result = Depth10StreamIterator::new(&temp_file, 10, 25, None, None, None, None);
2205 assert!(result.is_ok());
2206
2207 std::fs::remove_file(&temp_file).ok();
2208 }
2209
2210 #[rstest]
2211 pub fn test_stream_deltas_with_mid_snapshot_inserts_clear() {
2212 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2218binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2219binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2220binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2221binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2222binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
2223binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
2224binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
2225
2226 let temp_file = std::env::temp_dir().join("test_stream_deltas_mid_snapshot.csv");
2227 std::fs::write(&temp_file, csv_data).unwrap();
2228
2229 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, None).unwrap();
2230 let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2231
2232 let clear_count = all_deltas
2233 .iter()
2234 .filter(|d| d.action == BookAction::Clear)
2235 .count();
2236
2237 assert_eq!(
2239 clear_count, 2,
2240 "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
2241 );
2242
2243 assert_eq!(all_deltas[0].action, BookAction::Clear);
2246 assert_eq!(all_deltas[5].action, BookAction::Clear);
2247
2248 assert_eq!(
2250 all_deltas[0].flags & RecordFlag::F_LAST.value(),
2251 0,
2252 "CLEAR at index 0 should not have F_LAST flag"
2253 );
2254 assert_eq!(
2255 all_deltas[5].flags & RecordFlag::F_LAST.value(),
2256 0,
2257 "CLEAR at index 5 should not have F_LAST flag"
2258 );
2259
2260 std::fs::remove_file(&temp_file).ok();
2261 }
2262
2263 #[rstest]
2264 pub fn test_load_deltas_with_mid_snapshot_inserts_clear() {
2265 let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
2266 let deltas = load_deltas(&filepath, Some(1), Some(1), None, None).unwrap();
2267
2268 let clear_count = deltas
2269 .iter()
2270 .filter(|d| d.action == BookAction::Clear)
2271 .count();
2272
2273 assert_eq!(
2275 clear_count, 2,
2276 "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
2277 );
2278
2279 assert_eq!(deltas[0].action, BookAction::Clear);
2280
2281 let second_clear_idx = deltas
2282 .iter()
2283 .enumerate()
2284 .filter(|(_, d)| d.action == BookAction::Clear)
2285 .nth(1)
2286 .map(|(i, _)| i)
2287 .expect("Should have second CLEAR");
2288
2289 assert_eq!(
2291 second_clear_idx, 6,
2292 "Second CLEAR should be at index 6, found {second_clear_idx}"
2293 );
2294
2295 assert_eq!(
2297 deltas[0].flags & RecordFlag::F_LAST.value(),
2298 0,
2299 "CLEAR at index 0 should not have F_LAST flag"
2300 );
2301 assert_eq!(
2302 deltas[6].flags & RecordFlag::F_LAST.value(),
2303 0,
2304 "CLEAR at index 6 should not have F_LAST flag"
2305 );
2306 }
2307
2308 #[rstest]
2309 fn test_stream_deltas_chunk_size_respects_clear() {
2310 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2314binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2315binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2316
2317 let temp_file = std::env::temp_dir().join("test_stream_chunk_size_clear.csv");
2318 std::fs::write(&temp_file, csv_data).unwrap();
2319
2320 let stream = stream_deltas(&temp_file, 1, Some(1), Some(1), None, None).unwrap();
2322 let chunks: Vec<_> = stream.collect();
2323
2324 assert_eq!(chunks.len(), 3, "Expected 3 chunks with chunk_size=1");
2326 assert_eq!(chunks[0].as_ref().unwrap().len(), 1);
2327 assert_eq!(chunks[1].as_ref().unwrap().len(), 1);
2328 assert_eq!(chunks[2].as_ref().unwrap().len(), 1);
2329
2330 assert_eq!(chunks[0].as_ref().unwrap()[0].action, BookAction::Clear);
2332 assert_eq!(chunks[1].as_ref().unwrap()[0].action, BookAction::Add);
2334 assert_eq!(chunks[2].as_ref().unwrap()[0].action, BookAction::Add);
2335
2336 std::fs::remove_file(&temp_file).ok();
2337 }
2338
2339 #[rstest]
2340 fn test_stream_deltas_limit_stops_at_clear() {
2341 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2343binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2344binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2345
2346 let temp_file = std::env::temp_dir().join("test_stream_limit_stops_at_clear.csv");
2347 std::fs::write(&temp_file, csv_data).unwrap();
2348
2349 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(1)).unwrap();
2351 let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2352
2353 assert_eq!(all_deltas.len(), 1);
2354 assert_eq!(all_deltas[0].action, BookAction::Clear);
2355
2356 std::fs::remove_file(&temp_file).ok();
2357 }
2358
2359 #[rstest]
2360 fn test_stream_deltas_limit_includes_clear() {
2361 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2363binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2364binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2365binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2366binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2367binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2368
2369 let temp_file = std::env::temp_dir().join("test_stream_limit_includes_clear.csv");
2370 std::fs::write(&temp_file, csv_data).unwrap();
2371
2372 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(4)).unwrap();
2374 let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2375
2376 assert_eq!(all_deltas.len(), 4);
2377 assert_eq!(all_deltas[0].action, BookAction::Clear);
2378 assert_eq!(all_deltas[1].action, BookAction::Add);
2379 assert_eq!(all_deltas[2].action, BookAction::Add);
2380 assert_eq!(all_deltas[3].action, BookAction::Update);
2381
2382 std::fs::remove_file(&temp_file).ok();
2383 }
2384
2385 #[rstest]
2386 fn test_stream_deltas_limit_sets_f_last() {
2387 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2389binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2390binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2391binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2392binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2393binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2394
2395 let temp_file = std::env::temp_dir().join("test_stream_limit_f_last.csv");
2396 std::fs::write(&temp_file, csv_data).unwrap();
2397
2398 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(3)).unwrap();
2400 let chunks: Vec<_> = stream.collect();
2401
2402 assert_eq!(chunks.len(), 1);
2404 let deltas = chunks[0].as_ref().unwrap();
2405 assert_eq!(deltas.len(), 3);
2406
2407 assert_eq!(
2409 deltas[2].flags & RecordFlag::F_LAST.value(),
2410 RecordFlag::F_LAST.value(),
2411 "Final delta should have F_LAST flag when limit is reached"
2412 );
2413
2414 std::fs::remove_file(&temp_file).ok();
2415 }
2416
2417 #[rstest]
2418 fn test_stream_deltas_chunk_boundary_no_f_last() {
2419 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2421binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2422binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,ask,50001.0,2.0
2423binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,49999.0,0.5";
2424
2425 let temp_file = std::env::temp_dir().join("test_stream_chunk_no_f_last.csv");
2426 std::fs::write(&temp_file, csv_data).unwrap();
2427
2428 let mut stream = stream_deltas(&temp_file, 2, Some(1), Some(1), None, None).unwrap();
2430
2431 let chunk1 = stream.next().unwrap().unwrap();
2432 assert_eq!(chunk1.len(), 2);
2433
2434 assert_eq!(
2436 chunk1[1].flags & RecordFlag::F_LAST.value(),
2437 0,
2438 "Mid-stream chunk should not have F_LAST flag"
2439 );
2440
2441 let chunk2 = stream.next().unwrap().unwrap();
2443 assert_eq!(chunk2.len(), 1);
2444 assert_eq!(
2445 chunk2[0].flags & RecordFlag::F_LAST.value(),
2446 RecordFlag::F_LAST.value(),
2447 "Final chunk at EOF should have F_LAST flag"
2448 );
2449
2450 std::fs::remove_file(&temp_file).ok();
2451 }
2452}