1use std::{io::Read, path::Path};
17
18use csv::{Reader, StringRecord};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22 enums::{OrderSide, RecordFlag},
23 identifiers::InstrumentId,
24 types::Quantity,
25};
26#[cfg(feature = "python")]
27use nautilus_model::{
28 data::{Data, OrderBookDeltas, OrderBookDeltas_API},
29 python::data::data_to_pycapsule,
30};
31#[cfg(feature = "python")]
32use pyo3::{Py, PyAny, Python};
33
34use crate::{
35 csv::{
36 create_book_order, create_csv_reader, infer_precision, parse_delta_record,
37 parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
38 record::{
39 TardisBookUpdateRecord, TardisOrderBookSnapshot5Record,
40 TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
41 },
42 },
43 parse::{parse_instrument_id, parse_timestamp},
44};
45
46struct DeltaStreamIterator {
52 reader: Reader<Box<dyn std::io::Read>>,
53 record: StringRecord,
54 buffer: Vec<OrderBookDelta>,
55 chunk_size: usize,
56 instrument_id: Option<InstrumentId>,
57 price_precision: u8,
58 size_precision: u8,
59 last_ts_event: UnixNanos,
60 last_is_snapshot: bool,
61 limit: Option<usize>,
62 deltas_emitted: usize,
63
64 pending_record: Option<TardisBookUpdateRecord>,
66}
67
68impl DeltaStreamIterator {
69 fn new<P: AsRef<Path>>(
75 filepath: P,
76 chunk_size: usize,
77 price_precision: Option<u8>,
78 size_precision: Option<u8>,
79 instrument_id: Option<InstrumentId>,
80 limit: Option<usize>,
81 ) -> anyhow::Result<Self> {
82 let (final_price_precision, final_size_precision) =
83 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
84 (price_prec, size_prec)
86 } else {
87 let mut reader = create_csv_reader(&filepath)?;
89 let mut record = StringRecord::new();
90 let (detected_price, detected_size) =
91 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
92 (
93 price_precision.unwrap_or(detected_price),
94 size_precision.unwrap_or(detected_size),
95 )
96 };
97
98 let reader = create_csv_reader(filepath)?;
99
100 Ok(Self {
101 reader,
102 record: StringRecord::new(),
103 buffer: Vec::with_capacity(chunk_size),
104 chunk_size,
105 instrument_id,
106 price_precision: final_price_precision,
107 size_precision: final_size_precision,
108 last_ts_event: UnixNanos::default(),
109 last_is_snapshot: false,
110 limit,
111 deltas_emitted: 0,
112 pending_record: None,
113 })
114 }
115
116 fn detect_precision_from_sample(
117 reader: &mut Reader<Box<dyn std::io::Read>>,
118 record: &mut StringRecord,
119 sample_size: usize,
120 ) -> anyhow::Result<(u8, u8)> {
121 let mut max_price_precision = 0u8;
122 let mut max_size_precision = 0u8;
123 let mut records_scanned = 0;
124
125 while records_scanned < sample_size {
126 match reader.read_record(record) {
127 Ok(true) => {
128 if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
129 max_price_precision = max_price_precision.max(infer_precision(data.price));
130 max_size_precision = max_size_precision.max(infer_precision(data.amount));
131 records_scanned += 1;
132 }
133 }
134 Ok(false) => break, Err(_) => records_scanned += 1, }
137 }
138
139 Ok((max_price_precision, max_size_precision))
140 }
141}
142
143impl Iterator for DeltaStreamIterator {
144 type Item = anyhow::Result<Vec<OrderBookDelta>>;
145
146 fn next(&mut self) -> Option<Self::Item> {
147 if let Some(limit) = self.limit
148 && self.deltas_emitted >= limit
149 {
150 return None;
151 }
152
153 self.buffer.clear();
154
155 loop {
156 if self.buffer.len() >= self.chunk_size {
157 break;
158 }
159
160 if let Some(limit) = self.limit
161 && self.deltas_emitted >= limit
162 {
163 break;
164 }
165
166 let data = if let Some(pending) = self.pending_record.take() {
168 pending
169 } else {
170 match self.reader.read_record(&mut self.record) {
171 Ok(true) => match self.record.deserialize::<TardisBookUpdateRecord>(None) {
172 Ok(data) => data,
173 Err(e) => {
174 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
175 }
176 },
177 Ok(false) => {
178 if self.buffer.is_empty() {
179 return None;
180 }
181 if let Some(last_delta) = self.buffer.last_mut() {
182 last_delta.flags = RecordFlag::F_LAST.value();
183 }
184 return Some(Ok(self.buffer.clone()));
185 }
186 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
187 }
188 };
189
190 if data.is_snapshot && !self.last_is_snapshot {
192 let clear_instrument_id = self
193 .instrument_id
194 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
195 let ts_event = parse_timestamp(data.timestamp);
196 let ts_init = parse_timestamp(data.local_timestamp);
197
198 if self.last_ts_event != ts_event
199 && let Some(last_delta) = self.buffer.last_mut()
200 {
201 last_delta.flags = RecordFlag::F_LAST.value();
202 }
203 self.last_ts_event = ts_event;
204
205 let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
206 self.buffer.push(clear_delta);
207 self.deltas_emitted += 1;
208
209 if self.buffer.len() >= self.chunk_size
211 || self.limit.is_some_and(|l| self.deltas_emitted >= l)
212 {
213 self.last_is_snapshot = data.is_snapshot;
214 self.pending_record = Some(data);
215 break;
216 }
217 }
218 self.last_is_snapshot = data.is_snapshot;
219
220 let delta = match parse_delta_record(
221 &data,
222 self.price_precision,
223 self.size_precision,
224 self.instrument_id,
225 ) {
226 Ok(d) => d,
227 Err(e) => {
228 tracing::warn!("Skipping invalid delta record: {e}");
229 continue;
230 }
231 };
232
233 if self.last_ts_event != delta.ts_event
234 && let Some(last_delta) = self.buffer.last_mut()
235 {
236 last_delta.flags = RecordFlag::F_LAST.value();
237 }
238
239 self.last_ts_event = delta.ts_event;
240
241 self.buffer.push(delta);
242 self.deltas_emitted += 1;
243 }
244
245 if self.buffer.is_empty() {
246 None
247 } else {
248 if let Some(limit) = self.limit
251 && self.deltas_emitted >= limit
252 && let Some(last_delta) = self.buffer.last_mut()
253 {
254 last_delta.flags = RecordFlag::F_LAST.value();
255 }
256 Some(Ok(self.buffer.clone()))
257 }
258 }
259}
260
261pub fn stream_deltas<P: AsRef<Path>>(
276 filepath: P,
277 chunk_size: usize,
278 price_precision: Option<u8>,
279 size_precision: Option<u8>,
280 instrument_id: Option<InstrumentId>,
281 limit: Option<usize>,
282) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDelta>>>> {
283 DeltaStreamIterator::new(
284 filepath,
285 chunk_size,
286 price_precision,
287 size_precision,
288 instrument_id,
289 limit,
290 )
291}
292
293#[cfg(feature = "python")]
298struct BatchedDeltasStreamIterator {
300 reader: Reader<Box<dyn std::io::Read>>,
301 record: StringRecord,
302 buffer: Vec<Py<PyAny>>,
303 current_batch: Vec<OrderBookDelta>,
304 pending_batches: Vec<Vec<OrderBookDelta>>,
305 chunk_size: usize,
306 instrument_id: InstrumentId,
307 price_precision: u8,
308 size_precision: u8,
309 last_ts_event: UnixNanos,
310 limit: Option<usize>,
311 records_processed: usize,
312}
313
314#[cfg(feature = "python")]
315impl BatchedDeltasStreamIterator {
316 fn new<P: AsRef<Path>>(
322 filepath: P,
323 chunk_size: usize,
324 price_precision: Option<u8>,
325 size_precision: Option<u8>,
326 instrument_id: Option<InstrumentId>,
327 limit: Option<usize>,
328 ) -> anyhow::Result<Self> {
329 let mut reader = create_csv_reader(&filepath)?;
330 let mut record = StringRecord::new();
331
332 let first_record = if reader.read_record(&mut record)? {
333 record.deserialize::<TardisBookUpdateRecord>(None)?
334 } else {
335 anyhow::bail!("CSV file is empty");
336 };
337
338 let final_instrument_id = instrument_id
339 .unwrap_or_else(|| parse_instrument_id(&first_record.exchange, first_record.symbol));
340
341 let (final_price_precision, final_size_precision) =
342 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
343 (price_prec, size_prec)
345 } else {
346 let (detected_price, detected_size) =
348 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
349 (
350 price_precision.unwrap_or(detected_price),
351 size_precision.unwrap_or(detected_size),
352 )
353 };
354
355 let reader = create_csv_reader(filepath)?;
356
357 Ok(Self {
358 reader,
359 record: StringRecord::new(),
360 buffer: Vec::with_capacity(chunk_size),
361 current_batch: Vec::new(),
362 pending_batches: Vec::with_capacity(chunk_size),
363 chunk_size,
364 instrument_id: final_instrument_id,
365 price_precision: final_price_precision,
366 size_precision: final_size_precision,
367 last_ts_event: UnixNanos::default(),
368 limit,
369 records_processed: 0,
370 })
371 }
372
373 fn detect_precision_from_sample(
374 reader: &mut Reader<Box<dyn std::io::Read>>,
375 record: &mut StringRecord,
376 sample_size: usize,
377 ) -> anyhow::Result<(u8, u8)> {
378 let mut max_price_precision = 0u8;
379 let mut max_size_precision = 0u8;
380 let mut records_scanned = 0;
381
382 while records_scanned < sample_size {
383 match reader.read_record(record) {
384 Ok(true) => {
385 if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
386 max_price_precision = max_price_precision.max(infer_precision(data.price));
387 max_size_precision = max_size_precision.max(infer_precision(data.amount));
388 records_scanned += 1;
389 }
390 }
391 Ok(false) => break, Err(_) => records_scanned += 1, }
394 }
395
396 Ok((max_price_precision, max_size_precision))
397 }
398}
399
400#[cfg(feature = "python")]
401impl Iterator for BatchedDeltasStreamIterator {
402 type Item = anyhow::Result<Vec<Py<PyAny>>>;
403
404 fn next(&mut self) -> Option<Self::Item> {
405 if let Some(limit) = self.limit
406 && self.records_processed >= limit
407 {
408 return None;
409 }
410
411 self.buffer.clear();
412 let mut batches_created = 0;
413
414 while batches_created < self.chunk_size {
415 match self.reader.read_record(&mut self.record) {
416 Ok(true) => {
417 let delta = match self.record.deserialize::<TardisBookUpdateRecord>(None) {
418 Ok(data) => match parse_delta_record(
419 &data,
420 self.price_precision,
421 self.size_precision,
422 Some(self.instrument_id),
423 ) {
424 Ok(d) => d,
425 Err(e) => {
426 tracing::warn!("Skipping invalid delta record: {e}");
427 continue;
428 }
429 },
430 Err(e) => {
431 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
432 }
433 };
434
435 if self.last_ts_event != delta.ts_event && !self.current_batch.is_empty() {
436 if let Some(last_delta) = self.current_batch.last_mut() {
438 last_delta.flags = RecordFlag::F_LAST.value();
439 }
440 self.pending_batches
441 .push(std::mem::take(&mut self.current_batch));
442 batches_created += 1;
443 }
444
445 self.last_ts_event = delta.ts_event;
446 self.current_batch.push(delta);
447 self.records_processed += 1;
448
449 if let Some(limit) = self.limit
450 && self.records_processed >= limit
451 {
452 break;
453 }
454 }
455 Ok(false) => {
456 break;
458 }
459 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
460 }
461 }
462
463 if !self.current_batch.is_empty() && batches_created < self.chunk_size {
464 if let Some(last_delta) = self.current_batch.last_mut() {
466 last_delta.flags = RecordFlag::F_LAST.value();
467 }
468 self.pending_batches
469 .push(std::mem::take(&mut self.current_batch));
470 }
471
472 if self.pending_batches.is_empty() {
473 None
474 } else {
475 Python::attach(|py| {
477 for batch in self.pending_batches.drain(..) {
478 let deltas = OrderBookDeltas::new(self.instrument_id, batch);
479 let deltas = OrderBookDeltas_API::new(deltas);
480 let capsule = data_to_pycapsule(py, Data::Deltas(deltas));
481 self.buffer.push(capsule);
482 }
483 });
484 Some(Ok(std::mem::take(&mut self.buffer)))
485 }
486 }
487}
488
489#[cfg(feature = "python")]
490pub fn stream_batched_deltas<P: AsRef<Path>>(
497 filepath: P,
498 chunk_size: usize,
499 price_precision: Option<u8>,
500 size_precision: Option<u8>,
501 instrument_id: Option<InstrumentId>,
502 limit: Option<usize>,
503) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<Py<PyAny>>>>> {
504 BatchedDeltasStreamIterator::new(
505 filepath,
506 chunk_size,
507 price_precision,
508 size_precision,
509 instrument_id,
510 limit,
511 )
512}
513
514struct QuoteStreamIterator {
520 reader: Reader<Box<dyn Read>>,
521 record: StringRecord,
522 buffer: Vec<QuoteTick>,
523 chunk_size: usize,
524 instrument_id: Option<InstrumentId>,
525 price_precision: u8,
526 size_precision: u8,
527 limit: Option<usize>,
528 records_processed: usize,
529}
530
531impl QuoteStreamIterator {
532 pub fn new<P: AsRef<Path>>(
538 filepath: P,
539 chunk_size: usize,
540 price_precision: Option<u8>,
541 size_precision: Option<u8>,
542 instrument_id: Option<InstrumentId>,
543 limit: Option<usize>,
544 ) -> anyhow::Result<Self> {
545 let (final_price_precision, final_size_precision) =
546 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
547 (price_prec, size_prec)
549 } else {
550 let mut reader = create_csv_reader(&filepath)?;
552 let mut record = StringRecord::new();
553 let (detected_price, detected_size) =
554 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
555 (
556 price_precision.unwrap_or(detected_price),
557 size_precision.unwrap_or(detected_size),
558 )
559 };
560
561 let reader = create_csv_reader(filepath)?;
562
563 Ok(Self {
564 reader,
565 record: StringRecord::new(),
566 buffer: Vec::with_capacity(chunk_size),
567 chunk_size,
568 instrument_id,
569 price_precision: final_price_precision,
570 size_precision: final_size_precision,
571 limit,
572 records_processed: 0,
573 })
574 }
575
576 fn detect_precision_from_sample(
577 reader: &mut Reader<Box<dyn std::io::Read>>,
578 record: &mut StringRecord,
579 sample_size: usize,
580 ) -> anyhow::Result<(u8, u8)> {
581 let mut max_price_precision = 2u8;
582 let mut max_size_precision = 0u8;
583 let mut records_scanned = 0;
584
585 while records_scanned < sample_size {
586 match reader.read_record(record) {
587 Ok(true) => {
588 if let Ok(data) = record.deserialize::<TardisQuoteRecord>(None) {
589 if let Some(bid_price_val) = data.bid_price {
590 max_price_precision =
591 max_price_precision.max(infer_precision(bid_price_val));
592 }
593 if let Some(ask_price_val) = data.ask_price {
594 max_price_precision =
595 max_price_precision.max(infer_precision(ask_price_val));
596 }
597 if let Some(bid_amount_val) = data.bid_amount {
598 max_size_precision =
599 max_size_precision.max(infer_precision(bid_amount_val));
600 }
601 if let Some(ask_amount_val) = data.ask_amount {
602 max_size_precision =
603 max_size_precision.max(infer_precision(ask_amount_val));
604 }
605 records_scanned += 1;
606 }
607 }
608 Ok(false) => break, Err(_) => records_scanned += 1, }
611 }
612
613 Ok((max_price_precision, max_size_precision))
614 }
615}
616
617impl Iterator for QuoteStreamIterator {
618 type Item = anyhow::Result<Vec<QuoteTick>>;
619
620 fn next(&mut self) -> Option<Self::Item> {
621 if let Some(limit) = self.limit
622 && self.records_processed >= limit
623 {
624 return None;
625 }
626
627 self.buffer.clear();
628 let mut records_read = 0;
629
630 while records_read < self.chunk_size {
631 match self.reader.read_record(&mut self.record) {
632 Ok(true) => match self.record.deserialize::<TardisQuoteRecord>(None) {
633 Ok(data) => {
634 let quote = parse_quote_record(
635 &data,
636 self.price_precision,
637 self.size_precision,
638 self.instrument_id,
639 );
640
641 self.buffer.push(quote);
642 records_read += 1;
643 self.records_processed += 1;
644
645 if let Some(limit) = self.limit
646 && self.records_processed >= limit
647 {
648 break;
649 }
650 }
651 Err(e) => {
652 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
653 }
654 },
655 Ok(false) => {
656 if self.buffer.is_empty() {
657 return None;
658 }
659 return Some(Ok(self.buffer.clone()));
660 }
661 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
662 }
663 }
664
665 if self.buffer.is_empty() {
666 None
667 } else {
668 Some(Ok(self.buffer.clone()))
669 }
670 }
671}
672
673pub fn stream_quotes<P: AsRef<Path>>(
688 filepath: P,
689 chunk_size: usize,
690 price_precision: Option<u8>,
691 size_precision: Option<u8>,
692 instrument_id: Option<InstrumentId>,
693 limit: Option<usize>,
694) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<QuoteTick>>>> {
695 QuoteStreamIterator::new(
696 filepath,
697 chunk_size,
698 price_precision,
699 size_precision,
700 instrument_id,
701 limit,
702 )
703}
704
705struct TradeStreamIterator {
711 reader: Reader<Box<dyn Read>>,
712 record: StringRecord,
713 buffer: Vec<TradeTick>,
714 chunk_size: usize,
715 instrument_id: Option<InstrumentId>,
716 price_precision: u8,
717 size_precision: u8,
718 limit: Option<usize>,
719 records_processed: usize,
720}
721
722impl TradeStreamIterator {
723 pub fn new<P: AsRef<Path>>(
729 filepath: P,
730 chunk_size: usize,
731 price_precision: Option<u8>,
732 size_precision: Option<u8>,
733 instrument_id: Option<InstrumentId>,
734 limit: Option<usize>,
735 ) -> anyhow::Result<Self> {
736 let (final_price_precision, final_size_precision) =
737 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
738 (price_prec, size_prec)
740 } else {
741 let mut reader = create_csv_reader(&filepath)?;
743 let mut record = StringRecord::new();
744 let (detected_price, detected_size) =
745 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
746 (
747 price_precision.unwrap_or(detected_price),
748 size_precision.unwrap_or(detected_size),
749 )
750 };
751
752 let reader = create_csv_reader(filepath)?;
753
754 Ok(Self {
755 reader,
756 record: StringRecord::new(),
757 buffer: Vec::with_capacity(chunk_size),
758 chunk_size,
759 instrument_id,
760 price_precision: final_price_precision,
761 size_precision: final_size_precision,
762 limit,
763 records_processed: 0,
764 })
765 }
766
767 fn detect_precision_from_sample(
768 reader: &mut Reader<Box<dyn std::io::Read>>,
769 record: &mut StringRecord,
770 sample_size: usize,
771 ) -> anyhow::Result<(u8, u8)> {
772 let mut max_price_precision = 2u8;
773 let mut max_size_precision = 0u8;
774 let mut records_scanned = 0;
775
776 while records_scanned < sample_size {
777 match reader.read_record(record) {
778 Ok(true) => {
779 if let Ok(data) = record.deserialize::<TardisTradeRecord>(None) {
780 max_price_precision = max_price_precision.max(infer_precision(data.price));
781 max_size_precision = max_size_precision.max(infer_precision(data.amount));
782 records_scanned += 1;
783 }
784 }
785 Ok(false) => break, Err(_) => records_scanned += 1, }
788 }
789
790 Ok((max_price_precision, max_size_precision))
791 }
792}
793
794impl Iterator for TradeStreamIterator {
795 type Item = anyhow::Result<Vec<TradeTick>>;
796
797 fn next(&mut self) -> Option<Self::Item> {
798 if let Some(limit) = self.limit
799 && self.records_processed >= limit
800 {
801 return None;
802 }
803
804 self.buffer.clear();
805 let mut records_read = 0;
806
807 while records_read < self.chunk_size {
808 match self.reader.read_record(&mut self.record) {
809 Ok(true) => match self.record.deserialize::<TardisTradeRecord>(None) {
810 Ok(data) => {
811 let size = Quantity::new(data.amount, self.size_precision);
812
813 if size.is_positive() {
814 let trade = parse_trade_record(
815 &data,
816 size,
817 self.price_precision,
818 self.instrument_id,
819 );
820
821 self.buffer.push(trade);
822 records_read += 1;
823 self.records_processed += 1;
824
825 if let Some(limit) = self.limit
826 && self.records_processed >= limit
827 {
828 break;
829 }
830 } else {
831 log::warn!("Skipping zero-sized trade: {data:?}");
832 }
833 }
834 Err(e) => {
835 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
836 }
837 },
838 Ok(false) => {
839 if self.buffer.is_empty() {
840 return None;
841 }
842 return Some(Ok(self.buffer.clone()));
843 }
844 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
845 }
846 }
847
848 if self.buffer.is_empty() {
849 None
850 } else {
851 Some(Ok(self.buffer.clone()))
852 }
853 }
854}
855
856pub fn stream_trades<P: AsRef<Path>>(
871 filepath: P,
872 chunk_size: usize,
873 price_precision: Option<u8>,
874 size_precision: Option<u8>,
875 instrument_id: Option<InstrumentId>,
876 limit: Option<usize>,
877) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<TradeTick>>>> {
878 TradeStreamIterator::new(
879 filepath,
880 chunk_size,
881 price_precision,
882 size_precision,
883 instrument_id,
884 limit,
885 )
886}
887
888struct Depth10StreamIterator {
894 reader: Reader<Box<dyn Read>>,
895 record: StringRecord,
896 buffer: Vec<OrderBookDepth10>,
897 chunk_size: usize,
898 levels: u8,
899 instrument_id: Option<InstrumentId>,
900 price_precision: u8,
901 size_precision: u8,
902 limit: Option<usize>,
903 records_processed: usize,
904}
905
906impl Depth10StreamIterator {
907 pub fn new<P: AsRef<Path>>(
913 filepath: P,
914 chunk_size: usize,
915 levels: u8,
916 price_precision: Option<u8>,
917 size_precision: Option<u8>,
918 instrument_id: Option<InstrumentId>,
919 limit: Option<usize>,
920 ) -> anyhow::Result<Self> {
921 anyhow::ensure!(
922 levels == 5 || levels == 25,
923 "Invalid levels: {levels}. Must be 5 or 25."
924 );
925
926 let (final_price_precision, final_size_precision) =
927 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
928 (price_prec, size_prec)
930 } else {
931 let mut reader = create_csv_reader(&filepath)?;
933 let mut record = StringRecord::new();
934 let (detected_price, detected_size) =
935 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
936 (
937 price_precision.unwrap_or(detected_price),
938 size_precision.unwrap_or(detected_size),
939 )
940 };
941
942 let reader = create_csv_reader(filepath)?;
943
944 Ok(Self {
945 reader,
946 record: StringRecord::new(),
947 buffer: Vec::with_capacity(chunk_size),
948 chunk_size,
949 levels,
950 instrument_id,
951 price_precision: final_price_precision,
952 size_precision: final_size_precision,
953 limit,
954 records_processed: 0,
955 })
956 }
957
958 fn process_snapshot5(&mut self, data: TardisOrderBookSnapshot5Record) -> OrderBookDepth10 {
959 let instrument_id = self
960 .instrument_id
961 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
962
963 let mut bids = [NULL_ORDER; DEPTH10_LEN];
964 let mut asks = [NULL_ORDER; DEPTH10_LEN];
965 let mut bid_counts = [0_u32; DEPTH10_LEN];
966 let mut ask_counts = [0_u32; DEPTH10_LEN];
967
968 for i in 0..5 {
970 let (bid_price, bid_amount) = match i {
971 0 => (data.bids_0_price, data.bids_0_amount),
972 1 => (data.bids_1_price, data.bids_1_amount),
973 2 => (data.bids_2_price, data.bids_2_amount),
974 3 => (data.bids_3_price, data.bids_3_amount),
975 4 => (data.bids_4_price, data.bids_4_amount),
976 _ => unreachable!(),
977 };
978
979 let (ask_price, ask_amount) = match i {
980 0 => (data.asks_0_price, data.asks_0_amount),
981 1 => (data.asks_1_price, data.asks_1_amount),
982 2 => (data.asks_2_price, data.asks_2_amount),
983 3 => (data.asks_3_price, data.asks_3_amount),
984 4 => (data.asks_4_price, data.asks_4_amount),
985 _ => unreachable!(),
986 };
987
988 let (bid_order, bid_count) = create_book_order(
989 OrderSide::Buy,
990 bid_price,
991 bid_amount,
992 self.price_precision,
993 self.size_precision,
994 );
995 bids[i] = bid_order;
996 bid_counts[i] = bid_count;
997
998 let (ask_order, ask_count) = create_book_order(
999 OrderSide::Sell,
1000 ask_price,
1001 ask_amount,
1002 self.price_precision,
1003 self.size_precision,
1004 );
1005 asks[i] = ask_order;
1006 ask_counts[i] = ask_count;
1007 }
1008
1009 let flags = RecordFlag::F_SNAPSHOT.value();
1010 let sequence = 0;
1011 let ts_event = parse_timestamp(data.timestamp);
1012 let ts_init = parse_timestamp(data.local_timestamp);
1013
1014 OrderBookDepth10::new(
1015 instrument_id,
1016 bids,
1017 asks,
1018 bid_counts,
1019 ask_counts,
1020 flags,
1021 sequence,
1022 ts_event,
1023 ts_init,
1024 )
1025 }
1026
1027 fn process_snapshot25(&mut self, data: TardisOrderBookSnapshot25Record) -> OrderBookDepth10 {
1028 let instrument_id = self
1029 .instrument_id
1030 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1031
1032 let mut bids = [NULL_ORDER; DEPTH10_LEN];
1033 let mut asks = [NULL_ORDER; DEPTH10_LEN];
1034 let mut bid_counts = [0_u32; DEPTH10_LEN];
1035 let mut ask_counts = [0_u32; DEPTH10_LEN];
1036
1037 for i in 0..DEPTH10_LEN {
1039 let (bid_price, bid_amount) = match i {
1040 0 => (data.bids_0_price, data.bids_0_amount),
1041 1 => (data.bids_1_price, data.bids_1_amount),
1042 2 => (data.bids_2_price, data.bids_2_amount),
1043 3 => (data.bids_3_price, data.bids_3_amount),
1044 4 => (data.bids_4_price, data.bids_4_amount),
1045 5 => (data.bids_5_price, data.bids_5_amount),
1046 6 => (data.bids_6_price, data.bids_6_amount),
1047 7 => (data.bids_7_price, data.bids_7_amount),
1048 8 => (data.bids_8_price, data.bids_8_amount),
1049 9 => (data.bids_9_price, data.bids_9_amount),
1050 _ => unreachable!(),
1051 };
1052
1053 let (ask_price, ask_amount) = match i {
1054 0 => (data.asks_0_price, data.asks_0_amount),
1055 1 => (data.asks_1_price, data.asks_1_amount),
1056 2 => (data.asks_2_price, data.asks_2_amount),
1057 3 => (data.asks_3_price, data.asks_3_amount),
1058 4 => (data.asks_4_price, data.asks_4_amount),
1059 5 => (data.asks_5_price, data.asks_5_amount),
1060 6 => (data.asks_6_price, data.asks_6_amount),
1061 7 => (data.asks_7_price, data.asks_7_amount),
1062 8 => (data.asks_8_price, data.asks_8_amount),
1063 9 => (data.asks_9_price, data.asks_9_amount),
1064 _ => unreachable!(),
1065 };
1066
1067 let (bid_order, bid_count) = create_book_order(
1068 OrderSide::Buy,
1069 bid_price,
1070 bid_amount,
1071 self.price_precision,
1072 self.size_precision,
1073 );
1074 bids[i] = bid_order;
1075 bid_counts[i] = bid_count;
1076
1077 let (ask_order, ask_count) = create_book_order(
1078 OrderSide::Sell,
1079 ask_price,
1080 ask_amount,
1081 self.price_precision,
1082 self.size_precision,
1083 );
1084 asks[i] = ask_order;
1085 ask_counts[i] = ask_count;
1086 }
1087
1088 let flags = RecordFlag::F_SNAPSHOT.value();
1089 let sequence = 0;
1090 let ts_event = parse_timestamp(data.timestamp);
1091 let ts_init = parse_timestamp(data.local_timestamp);
1092
1093 OrderBookDepth10::new(
1094 instrument_id,
1095 bids,
1096 asks,
1097 bid_counts,
1098 ask_counts,
1099 flags,
1100 sequence,
1101 ts_event,
1102 ts_init,
1103 )
1104 }
1105
1106 fn detect_precision_from_sample(
1107 reader: &mut Reader<Box<dyn std::io::Read>>,
1108 record: &mut StringRecord,
1109 sample_size: usize,
1110 ) -> anyhow::Result<(u8, u8)> {
1111 let mut max_price_precision = 2u8;
1112 let mut max_size_precision = 0u8;
1113 let mut records_scanned = 0;
1114
1115 while records_scanned < sample_size {
1116 match reader.read_record(record) {
1117 Ok(true) => {
1118 if let Ok(data) = record.deserialize::<TardisOrderBookSnapshot5Record>(None) {
1120 if let Some(bid_price) = data.bids_0_price {
1121 max_price_precision =
1122 max_price_precision.max(infer_precision(bid_price));
1123 }
1124 if let Some(ask_price) = data.asks_0_price {
1125 max_price_precision =
1126 max_price_precision.max(infer_precision(ask_price));
1127 }
1128 if let Some(bid_amount) = data.bids_0_amount {
1129 max_size_precision =
1130 max_size_precision.max(infer_precision(bid_amount));
1131 }
1132 if let Some(ask_amount) = data.asks_0_amount {
1133 max_size_precision =
1134 max_size_precision.max(infer_precision(ask_amount));
1135 }
1136 records_scanned += 1;
1137 } else if let Ok(data) =
1138 record.deserialize::<TardisOrderBookSnapshot25Record>(None)
1139 {
1140 if let Some(bid_price) = data.bids_0_price {
1141 max_price_precision =
1142 max_price_precision.max(infer_precision(bid_price));
1143 }
1144 if let Some(ask_price) = data.asks_0_price {
1145 max_price_precision =
1146 max_price_precision.max(infer_precision(ask_price));
1147 }
1148 if let Some(bid_amount) = data.bids_0_amount {
1149 max_size_precision =
1150 max_size_precision.max(infer_precision(bid_amount));
1151 }
1152 if let Some(ask_amount) = data.asks_0_amount {
1153 max_size_precision =
1154 max_size_precision.max(infer_precision(ask_amount));
1155 }
1156 records_scanned += 1;
1157 }
1158 }
1159 Ok(false) => break, Err(_) => records_scanned += 1, }
1162 }
1163
1164 Ok((max_price_precision, max_size_precision))
1165 }
1166}
1167
1168impl Iterator for Depth10StreamIterator {
1169 type Item = anyhow::Result<Vec<OrderBookDepth10>>;
1170
1171 fn next(&mut self) -> Option<Self::Item> {
1172 if let Some(limit) = self.limit
1173 && self.records_processed >= limit
1174 {
1175 return None;
1176 }
1177
1178 if !self.buffer.is_empty() {
1179 let chunk = self.buffer.split_off(0);
1180 return Some(Ok(chunk));
1181 }
1182
1183 self.buffer.clear();
1184 let mut records_read = 0;
1185
1186 while records_read < self.chunk_size {
1187 match self.reader.read_record(&mut self.record) {
1188 Ok(true) => {
1189 let result = match self.levels {
1190 5 => self
1191 .record
1192 .deserialize::<TardisOrderBookSnapshot5Record>(None)
1193 .map(|data| self.process_snapshot5(data)),
1194 25 => self
1195 .record
1196 .deserialize::<TardisOrderBookSnapshot25Record>(None)
1197 .map(|data| self.process_snapshot25(data)),
1198 _ => return Some(Err(anyhow::anyhow!("Invalid levels: {}", self.levels))),
1199 };
1200
1201 match result {
1202 Ok(depth) => {
1203 self.buffer.push(depth);
1204 records_read += 1;
1205 self.records_processed += 1;
1206
1207 if let Some(limit) = self.limit
1208 && self.records_processed >= limit
1209 {
1210 break;
1211 }
1212 }
1213 Err(e) => {
1214 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
1215 }
1216 }
1217 }
1218 Ok(false) => {
1219 if self.buffer.is_empty() {
1220 return None;
1221 }
1222 let chunk = self.buffer.split_off(0);
1223 return Some(Ok(chunk));
1224 }
1225 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1226 }
1227 }
1228
1229 if self.buffer.is_empty() {
1230 None
1231 } else {
1232 let chunk = self.buffer.split_off(0);
1233 Some(Ok(chunk))
1234 }
1235 }
1236}
1237
1238pub fn stream_depth10_from_snapshot5<P: AsRef<Path>>(
1253 filepath: P,
1254 chunk_size: usize,
1255 price_precision: Option<u8>,
1256 size_precision: Option<u8>,
1257 instrument_id: Option<InstrumentId>,
1258 limit: Option<usize>,
1259) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1260 Depth10StreamIterator::new(
1261 filepath,
1262 chunk_size,
1263 5,
1264 price_precision,
1265 size_precision,
1266 instrument_id,
1267 limit,
1268 )
1269}
1270
1271pub fn stream_depth10_from_snapshot25<P: AsRef<Path>>(
1286 filepath: P,
1287 chunk_size: usize,
1288 price_precision: Option<u8>,
1289 size_precision: Option<u8>,
1290 instrument_id: Option<InstrumentId>,
1291 limit: Option<usize>,
1292) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1293 Depth10StreamIterator::new(
1294 filepath,
1295 chunk_size,
1296 25,
1297 price_precision,
1298 size_precision,
1299 instrument_id,
1300 limit,
1301 )
1302}
1303
1304use nautilus_model::data::FundingRateUpdate;
1309
1310use crate::csv::record::TardisDerivativeTickerRecord;
1311
1312struct FundingRateStreamIterator {
1314 reader: Reader<Box<dyn Read>>,
1315 record: StringRecord,
1316 buffer: Vec<FundingRateUpdate>,
1317 chunk_size: usize,
1318 instrument_id: Option<InstrumentId>,
1319 limit: Option<usize>,
1320 records_processed: usize,
1321}
1322
1323impl FundingRateStreamIterator {
1324 fn new<P: AsRef<Path>>(
1330 filepath: P,
1331 chunk_size: usize,
1332 instrument_id: Option<InstrumentId>,
1333 limit: Option<usize>,
1334 ) -> anyhow::Result<Self> {
1335 let reader = create_csv_reader(filepath)?;
1336
1337 Ok(Self {
1338 reader,
1339 record: StringRecord::new(),
1340 buffer: Vec::with_capacity(chunk_size),
1341 chunk_size,
1342 instrument_id,
1343 limit,
1344 records_processed: 0,
1345 })
1346 }
1347}
1348
1349impl Iterator for FundingRateStreamIterator {
1350 type Item = anyhow::Result<Vec<FundingRateUpdate>>;
1351
1352 fn next(&mut self) -> Option<Self::Item> {
1353 if let Some(limit) = self.limit
1354 && self.records_processed >= limit
1355 {
1356 return None;
1357 }
1358
1359 if !self.buffer.is_empty() {
1360 let chunk = self.buffer.split_off(0);
1361 return Some(Ok(chunk));
1362 }
1363
1364 self.buffer.clear();
1365 let mut records_read = 0;
1366
1367 while records_read < self.chunk_size {
1368 match self.reader.read_record(&mut self.record) {
1369 Ok(true) => {
1370 let result = self
1371 .record
1372 .deserialize::<TardisDerivativeTickerRecord>(None)
1373 .map_err(anyhow::Error::from)
1374 .map(|data| parse_derivative_ticker_record(&data, self.instrument_id));
1375
1376 match result {
1377 Ok(Some(funding_rate)) => {
1378 self.buffer.push(funding_rate);
1379 records_read += 1;
1380 self.records_processed += 1;
1381
1382 if let Some(limit) = self.limit
1383 && self.records_processed >= limit
1384 {
1385 break;
1386 }
1387 }
1388 Ok(None) => {
1389 self.records_processed += 1;
1391 }
1392 Err(e) => {
1393 return Some(Err(anyhow::anyhow!(
1394 "Failed to parse funding rate record: {e}"
1395 )));
1396 }
1397 }
1398 }
1399 Ok(false) => {
1400 if self.buffer.is_empty() {
1401 return None;
1402 }
1403 let chunk = self.buffer.split_off(0);
1404 return Some(Ok(chunk));
1405 }
1406 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1407 }
1408 }
1409
1410 if self.buffer.is_empty() {
1411 None
1412 } else {
1413 let chunk = self.buffer.split_off(0);
1414 Some(Ok(chunk))
1415 }
1416 }
1417}
1418
1419pub fn stream_funding_rates<P: AsRef<Path>>(
1429 filepath: P,
1430 chunk_size: usize,
1431 instrument_id: Option<InstrumentId>,
1432 limit: Option<usize>,
1433) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<FundingRateUpdate>>>> {
1434 FundingRateStreamIterator::new(filepath, chunk_size, instrument_id, limit)
1435}
1436
1437#[cfg(test)]
1438mod tests {
1439 use nautilus_model::{
1440 enums::{AggressorSide, BookAction},
1441 identifiers::TradeId,
1442 types::Price,
1443 };
1444 use rstest::*;
1445
1446 use super::*;
1447 use crate::{csv::load::load_deltas, parse::parse_price, tests::get_test_data_path};
1448
1449 #[rstest]
1450 #[case(0.0, 0)]
1451 #[case(42.0, 0)]
1452 #[case(0.1, 1)]
1453 #[case(0.25, 2)]
1454 #[case(123.0001, 4)]
1455 #[case(-42.987654321, 9)]
1456 #[case(1.234_567_890_123, 12)]
1457 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
1458 assert_eq!(infer_precision(input), expected);
1459 }
1460
1461 #[rstest]
1462 pub fn test_stream_deltas_chunked() {
1463 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1464binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1465binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1466binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1467binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1468binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1469
1470 let temp_file = std::env::temp_dir().join("test_stream_deltas.csv");
1471 std::fs::write(&temp_file, csv_data).unwrap();
1472
1473 let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1474 let chunks: Vec<_> = stream.collect();
1475
1476 assert_eq!(chunks.len(), 3);
1478
1479 let chunk1 = chunks[0].as_ref().unwrap();
1480 assert_eq!(chunk1.len(), 2);
1481 assert_eq!(chunk1[0].action, BookAction::Clear); assert_eq!(chunk1[1].order.price.precision, 4); let chunk2 = chunks[1].as_ref().unwrap();
1485 assert_eq!(chunk2.len(), 2);
1486 assert_eq!(chunk2[0].order.price.precision, 4);
1487 assert_eq!(chunk2[1].order.price.precision, 4);
1488
1489 let chunk3 = chunks[2].as_ref().unwrap();
1490 assert_eq!(chunk3.len(), 2);
1491 assert_eq!(chunk3[0].order.price.precision, 4);
1492 assert_eq!(chunk3[1].order.price.precision, 4);
1493
1494 let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1495 assert_eq!(total_deltas, 6);
1496
1497 std::fs::remove_file(&temp_file).ok();
1498 }
1499
1500 #[rstest]
1501 pub fn test_stream_quotes_chunked() {
1502 let csv_data =
1503 "exchange,symbol,timestamp,local_timestamp,ask_amount,ask_price,bid_price,bid_amount
1504binance,BTCUSDT,1640995200000000,1640995200100000,1.0,50000.0,49999.0,1.5
1505binance,BTCUSDT,1640995201000000,1640995201100000,2.0,50000.5,49999.5,2.5
1506binance,BTCUSDT,1640995202000000,1640995202100000,1.5,50000.12,49999.12,1.8
1507binance,BTCUSDT,1640995203000000,1640995203100000,3.0,50000.123,49999.123,3.2
1508binance,BTCUSDT,1640995204000000,1640995204100000,0.5,50000.1234,49999.1234,0.8";
1509
1510 let temp_file = std::env::temp_dir().join("test_stream_quotes.csv");
1511 std::fs::write(&temp_file, csv_data).unwrap();
1512
1513 let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1514 let chunks: Vec<_> = stream.collect();
1515
1516 assert_eq!(chunks.len(), 3);
1517
1518 let chunk1 = chunks[0].as_ref().unwrap();
1519 assert_eq!(chunk1.len(), 2);
1520 assert_eq!(chunk1[0].bid_price.precision, 4);
1521 assert_eq!(chunk1[1].bid_price.precision, 4);
1522
1523 let chunk2 = chunks[1].as_ref().unwrap();
1524 assert_eq!(chunk2.len(), 2);
1525 assert_eq!(chunk2[0].bid_price.precision, 4);
1526 assert_eq!(chunk2[1].bid_price.precision, 4);
1527
1528 let chunk3 = chunks[2].as_ref().unwrap();
1529 assert_eq!(chunk3.len(), 1);
1530 assert_eq!(chunk3[0].bid_price.precision, 4);
1531
1532 let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1533 assert_eq!(total_quotes, 5);
1534
1535 std::fs::remove_file(&temp_file).ok();
1536 }
1537
1538 #[rstest]
1539 pub fn test_stream_trades_chunked() {
1540 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1541binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1542binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1543binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1544binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1545binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1546
1547 let temp_file = std::env::temp_dir().join("test_stream_trades.csv");
1548 std::fs::write(&temp_file, csv_data).unwrap();
1549
1550 let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1551 let chunks: Vec<_> = stream.collect();
1552
1553 assert_eq!(chunks.len(), 2);
1554
1555 let chunk1 = chunks[0].as_ref().unwrap();
1556 assert_eq!(chunk1.len(), 3);
1557 assert_eq!(chunk1[0].price.precision, 4);
1558 assert_eq!(chunk1[1].price.precision, 4);
1559 assert_eq!(chunk1[2].price.precision, 4);
1560
1561 let chunk2 = chunks[1].as_ref().unwrap();
1562 assert_eq!(chunk2.len(), 2);
1563 assert_eq!(chunk2[0].price.precision, 4);
1564 assert_eq!(chunk2[1].price.precision, 4);
1565
1566 assert_eq!(chunk1[0].aggressor_side, AggressorSide::Buyer);
1567 assert_eq!(chunk1[1].aggressor_side, AggressorSide::Seller);
1568
1569 let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1570 assert_eq!(total_trades, 5);
1571
1572 std::fs::remove_file(&temp_file).ok();
1573 }
1574
1575 #[rstest]
1576 pub fn test_stream_trades_with_zero_sized_trade() {
1577 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1579binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1580binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1581binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1582binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1583
1584 let temp_file = std::env::temp_dir().join("test_stream_trades_zero_size.csv");
1585 std::fs::write(&temp_file, csv_data).unwrap();
1586
1587 let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1588 let chunks: Vec<_> = stream.collect();
1589
1590 assert_eq!(chunks.len(), 1);
1592
1593 let chunk1 = chunks[0].as_ref().unwrap();
1594 assert_eq!(chunk1.len(), 3);
1595
1596 assert_eq!(chunk1[0].size, Quantity::from("1.0"));
1598 assert_eq!(chunk1[1].size, Quantity::from("1.5"));
1599 assert_eq!(chunk1[2].size, Quantity::from("3.0"));
1600
1601 assert_eq!(chunk1[0].trade_id, TradeId::new("trade1"));
1603 assert_eq!(chunk1[1].trade_id, TradeId::new("trade3"));
1604 assert_eq!(chunk1[2].trade_id, TradeId::new("trade4"));
1605
1606 std::fs::remove_file(&temp_file).ok();
1607 }
1608
1609 #[rstest]
1610 pub fn test_stream_depth10_from_snapshot5_chunked() {
1611 let csv_data = "exchange,symbol,timestamp,local_timestamp,asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1612binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,49999.0,1.5,50002.0,2.0,49998.0,2.5,50003.0,3.0,49997.0,3.5,50004.0,4.0,49996.0,4.5,50005.0,5.0,49995.0,5.5
1613binance,BTCUSDT,1640995201000000,1640995201100000,50001.5,1.1,49999.5,1.6,50002.5,2.1,49998.5,2.6,50003.5,3.1,49997.5,3.6,50004.5,4.1,49996.5,4.6,50005.5,5.1,49995.5,5.6
1614binance,BTCUSDT,1640995202000000,1640995202100000,50001.12,1.12,49999.12,1.62,50002.12,2.12,49998.12,2.62,50003.12,3.12,49997.12,3.62,50004.12,4.12,49996.12,4.62,50005.12,5.12,49995.12,5.62";
1615
1616 let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot5.csv");
1618 std::fs::write(&temp_file, csv_data).unwrap();
1619
1620 let stream = stream_depth10_from_snapshot5(&temp_file, 2, None, None, None, None).unwrap();
1622 let chunks: Vec<_> = stream.collect();
1623
1624 assert_eq!(chunks.len(), 2);
1626
1627 let chunk1 = chunks[0].as_ref().unwrap();
1629 assert_eq!(chunk1.len(), 2);
1630
1631 let chunk2 = chunks[1].as_ref().unwrap();
1633 assert_eq!(chunk2.len(), 1);
1634
1635 let first_depth = &chunk1[0];
1637 assert_eq!(first_depth.bids.len(), 10); assert_eq!(first_depth.asks.len(), 10);
1639
1640 assert_eq!(first_depth.bids[0].price, parse_price(49999.0, 1));
1642 assert_eq!(first_depth.asks[0].price, parse_price(50001.0, 1));
1643
1644 let total_depths: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1646 assert_eq!(total_depths, 3);
1647
1648 std::fs::remove_file(&temp_file).ok();
1650 }
1651
1652 #[rstest]
1653 pub fn test_stream_depth10_from_snapshot25_chunked() {
1654 let mut header_parts = vec!["exchange", "symbol", "timestamp", "local_timestamp"];
1656
1657 let mut bid_headers = Vec::new();
1659 let mut ask_headers = Vec::new();
1660 for i in 0..25 {
1661 bid_headers.push(format!("bids[{i}].price"));
1662 bid_headers.push(format!("bids[{i}].amount"));
1663 }
1664 for i in 0..25 {
1665 ask_headers.push(format!("asks[{i}].price"));
1666 ask_headers.push(format!("asks[{i}].amount"));
1667 }
1668
1669 for header in &bid_headers {
1670 header_parts.push(header);
1671 }
1672 for header in &ask_headers {
1673 header_parts.push(header);
1674 }
1675
1676 let header = header_parts.join(",");
1677
1678 let mut row1_parts = vec![
1680 "binance".to_string(),
1681 "BTCUSDT".to_string(),
1682 "1640995200000000".to_string(),
1683 "1640995200100000".to_string(),
1684 ];
1685
1686 for i in 0..25 {
1688 if i < 5 {
1689 let bid_price = f64::from(i).mul_add(-0.01, 49999.0);
1690 let bid_amount = 1.0 + f64::from(i);
1691 row1_parts.push(bid_price.to_string());
1692 row1_parts.push(bid_amount.to_string());
1693 } else {
1694 row1_parts.push(String::new());
1695 row1_parts.push(String::new());
1696 }
1697 }
1698
1699 for i in 0..25 {
1701 if i < 5 {
1702 let ask_price = f64::from(i).mul_add(0.01, 50000.0);
1703 let ask_amount = 1.0 + f64::from(i);
1704 row1_parts.push(ask_price.to_string());
1705 row1_parts.push(ask_amount.to_string());
1706 } else {
1707 row1_parts.push(String::new());
1708 row1_parts.push(String::new());
1709 }
1710 }
1711
1712 let csv_data = format!("{}\n{}", header, row1_parts.join(","));
1713
1714 let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot25.csv");
1716 std::fs::write(&temp_file, &csv_data).unwrap();
1717
1718 let stream = stream_depth10_from_snapshot25(&temp_file, 1, None, None, None, None).unwrap();
1720 let chunks: Vec<_> = stream.collect();
1721
1722 assert_eq!(chunks.len(), 1);
1724
1725 let chunk1 = chunks[0].as_ref().unwrap();
1726 assert_eq!(chunk1.len(), 1);
1727
1728 let depth = &chunk1[0];
1730 assert_eq!(depth.bids.len(), 10); assert_eq!(depth.asks.len(), 10);
1732
1733 let actual_bid_price = depth.bids[0].price;
1735 let actual_ask_price = depth.asks[0].price;
1736 assert!(actual_bid_price.as_f64() > 0.0);
1737 assert!(actual_ask_price.as_f64() > 0.0);
1738
1739 std::fs::remove_file(&temp_file).ok();
1741 }
1742
1743 #[rstest]
1744 pub fn test_stream_error_handling() {
1745 let non_existent = std::path::Path::new("does_not_exist.csv");
1747
1748 let result = stream_deltas(non_existent, 10, None, None, None, None);
1749 assert!(result.is_err());
1750
1751 let result = stream_quotes(non_existent, 10, None, None, None, None);
1752 assert!(result.is_err());
1753
1754 let result = stream_trades(non_existent, 10, None, None, None, None);
1755 assert!(result.is_err());
1756
1757 let result = stream_depth10_from_snapshot5(non_existent, 10, None, None, None, None);
1758 assert!(result.is_err());
1759
1760 let result = stream_depth10_from_snapshot25(non_existent, 10, None, None, None, None);
1761 assert!(result.is_err());
1762 }
1763
1764 #[rstest]
1765 pub fn test_stream_empty_file() {
1766 let temp_file = std::env::temp_dir().join("test_empty.csv");
1768 std::fs::write(&temp_file, "").unwrap();
1769
1770 let stream = stream_deltas(&temp_file, 10, None, None, None, None).unwrap();
1771 assert_eq!(stream.count(), 0);
1772
1773 std::fs::remove_file(&temp_file).ok();
1775 }
1776
1777 #[rstest]
1778 pub fn test_stream_precision_consistency() {
1779 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1781binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1782binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1783binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1784binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0";
1785
1786 let temp_file = std::env::temp_dir().join("test_precision_consistency.csv");
1787 std::fs::write(&temp_file, csv_data).unwrap();
1788
1789 let bulk_deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
1791
1792 let stream = stream_deltas(&temp_file, 2, None, None, None, None).unwrap();
1794 let streamed_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
1795
1796 assert_eq!(bulk_deltas.len(), streamed_deltas.len());
1798
1799 for (bulk, streamed) in bulk_deltas.iter().zip(streamed_deltas.iter()) {
1801 assert_eq!(bulk.instrument_id, streamed.instrument_id);
1802 assert_eq!(bulk.action, streamed.action);
1803 assert_eq!(bulk.order.side, streamed.order.side);
1804 assert_eq!(bulk.ts_event, streamed.ts_event);
1805 assert_eq!(bulk.ts_init, streamed.ts_init);
1806 }
1808
1809 std::fs::remove_file(&temp_file).ok();
1811 }
1812
1813 #[rstest]
1814 pub fn test_stream_trades_from_local_file() {
1815 let filepath = get_test_data_path("csv/trades_1.csv");
1816 let mut stream = stream_trades(filepath, 1, Some(1), Some(0), None, None).unwrap();
1817
1818 let chunk1 = stream.next().unwrap().unwrap();
1819 assert_eq!(chunk1.len(), 1);
1820 assert_eq!(chunk1[0].price, Price::from("8531.5"));
1821
1822 let chunk2 = stream.next().unwrap().unwrap();
1823 assert_eq!(chunk2.len(), 1);
1824 assert_eq!(chunk2[0].size, Quantity::from("1000"));
1825
1826 assert!(stream.next().is_none());
1827 }
1828
1829 #[rstest]
1830 pub fn test_stream_deltas_from_local_file() {
1831 let filepath = get_test_data_path("csv/deltas_1.csv");
1832 let mut stream = stream_deltas(filepath, 1, Some(1), Some(0), None, None).unwrap();
1833
1834 let chunk1 = stream.next().unwrap().unwrap();
1837 assert_eq!(chunk1.len(), 1);
1838 assert_eq!(chunk1[0].action, BookAction::Clear);
1839
1840 let chunk2 = stream.next().unwrap().unwrap();
1842 assert_eq!(chunk2.len(), 1);
1843 assert_eq!(chunk2[0].order.price, Price::from("6421.5"));
1844
1845 let chunk3 = stream.next().unwrap().unwrap();
1847 assert_eq!(chunk3.len(), 1);
1848 assert_eq!(chunk3[0].order.size, Quantity::from("10000"));
1849
1850 assert!(stream.next().is_none());
1851 }
1852
1853 #[rstest]
1854 pub fn test_stream_deltas_with_limit() {
1855 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1856binance,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
1857binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
1858binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,1.5
1859binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,3.0
1860binance,BTCUSDT,1640995204000000,1640995204100000,false,bid,49998.0,0.5";
1861
1862 let temp_file = std::env::temp_dir().join("test_stream_deltas_limit.csv");
1863 std::fs::write(&temp_file, csv_data).unwrap();
1864
1865 let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
1867 let chunks: Vec<_> = stream.collect();
1868
1869 assert_eq!(chunks.len(), 2);
1871 let chunk1 = chunks[0].as_ref().unwrap();
1872 assert_eq!(chunk1.len(), 2);
1873 let chunk2 = chunks[1].as_ref().unwrap();
1874 assert_eq!(chunk2.len(), 1);
1875
1876 let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1878 assert_eq!(total_deltas, 3);
1879
1880 std::fs::remove_file(&temp_file).ok();
1881 }
1882
1883 #[rstest]
1884 pub fn test_stream_quotes_with_limit() {
1885 let csv_data =
1886 "exchange,symbol,timestamp,local_timestamp,ask_price,ask_amount,bid_price,bid_amount
1887binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,50000.0,1.5
1888binance,BTCUSDT,1640995201000000,1640995201100000,50002.0,2.0,49999.0,2.5
1889binance,BTCUSDT,1640995202000000,1640995202100000,50003.0,1.5,49998.0,3.0
1890binance,BTCUSDT,1640995203000000,1640995203100000,50004.0,3.0,49997.0,3.5";
1891
1892 let temp_file = std::env::temp_dir().join("test_stream_quotes_limit.csv");
1893 std::fs::write(&temp_file, csv_data).unwrap();
1894
1895 let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, Some(2)).unwrap();
1897 let chunks: Vec<_> = stream.collect();
1898
1899 assert_eq!(chunks.len(), 1);
1901 let chunk1 = chunks[0].as_ref().unwrap();
1902 assert_eq!(chunk1.len(), 2);
1903
1904 let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1906 assert_eq!(total_quotes, 2);
1907
1908 std::fs::remove_file(&temp_file).ok();
1909 }
1910
1911 #[rstest]
1912 pub fn test_stream_trades_with_limit() {
1913 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1914binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1915binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1916binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1917binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1918binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1919
1920 let temp_file = std::env::temp_dir().join("test_stream_trades_limit.csv");
1921 std::fs::write(&temp_file, csv_data).unwrap();
1922
1923 let stream = stream_trades(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
1925 let chunks: Vec<_> = stream.collect();
1926
1927 assert_eq!(chunks.len(), 2);
1929 let chunk1 = chunks[0].as_ref().unwrap();
1930 assert_eq!(chunk1.len(), 2);
1931 let chunk2 = chunks[1].as_ref().unwrap();
1932 assert_eq!(chunk2.len(), 1);
1933
1934 let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1936 assert_eq!(total_trades, 3);
1937
1938 std::fs::remove_file(&temp_file).ok();
1939 }
1940
1941 #[rstest]
1942 pub fn test_depth10_invalid_levels_error_at_construction() {
1943 let temp_file = std::env::temp_dir().join("test_depth10_invalid_levels.csv");
1944 std::fs::write(&temp_file, "exchange,symbol,timestamp,local_timestamp\n").unwrap();
1945
1946 let result = Depth10StreamIterator::new(&temp_file, 10, 10, None, None, None, None);
1947 assert!(result.is_err());
1948 let err_msg = result.err().unwrap().to_string();
1949 assert!(
1950 err_msg.contains("Invalid levels"),
1951 "Error should mention 'Invalid levels': {err_msg}"
1952 );
1953
1954 let result = Depth10StreamIterator::new(&temp_file, 10, 3, None, None, None, None);
1955 assert!(result.is_err());
1956
1957 let result = Depth10StreamIterator::new(&temp_file, 10, 5, None, None, None, None);
1958 assert!(result.is_ok());
1959
1960 let result = Depth10StreamIterator::new(&temp_file, 10, 25, None, None, None, None);
1961 assert!(result.is_ok());
1962
1963 std::fs::remove_file(&temp_file).ok();
1964 }
1965
1966 #[rstest]
1967 pub fn test_stream_deltas_with_mid_snapshot_inserts_clear() {
1968 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1974binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1975binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1976binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1977binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1978binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
1979binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
1980binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
1981
1982 let temp_file = std::env::temp_dir().join("test_stream_deltas_mid_snapshot.csv");
1983 std::fs::write(&temp_file, csv_data).unwrap();
1984
1985 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, None).unwrap();
1986 let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
1987
1988 let clear_count = all_deltas
1989 .iter()
1990 .filter(|d| d.action == BookAction::Clear)
1991 .count();
1992
1993 assert_eq!(
1995 clear_count, 2,
1996 "Expected 2 CLEAR deltas (initial + mid-day snapshot), got {clear_count}"
1997 );
1998
1999 assert_eq!(all_deltas[0].action, BookAction::Clear);
2002 assert_eq!(all_deltas[5].action, BookAction::Clear);
2003
2004 assert_eq!(
2006 all_deltas[0].flags & RecordFlag::F_LAST.value(),
2007 0,
2008 "CLEAR at index 0 should not have F_LAST flag"
2009 );
2010 assert_eq!(
2011 all_deltas[5].flags & RecordFlag::F_LAST.value(),
2012 0,
2013 "CLEAR at index 5 should not have F_LAST flag"
2014 );
2015
2016 std::fs::remove_file(&temp_file).ok();
2017 }
2018
2019 #[rstest]
2020 pub fn test_load_deltas_with_mid_snapshot_inserts_clear() {
2021 let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
2022 let deltas = load_deltas(&filepath, Some(1), Some(1), None, None).unwrap();
2023
2024 let clear_count = deltas
2025 .iter()
2026 .filter(|d| d.action == BookAction::Clear)
2027 .count();
2028
2029 assert_eq!(
2031 clear_count, 2,
2032 "Expected 2 CLEAR deltas (initial + mid-day snapshot), got {clear_count}"
2033 );
2034
2035 assert_eq!(deltas[0].action, BookAction::Clear);
2036
2037 let second_clear_idx = deltas
2038 .iter()
2039 .enumerate()
2040 .filter(|(_, d)| d.action == BookAction::Clear)
2041 .nth(1)
2042 .map(|(i, _)| i)
2043 .expect("Should have second CLEAR");
2044
2045 assert_eq!(
2047 second_clear_idx, 6,
2048 "Second CLEAR should be at index 6, got {second_clear_idx}"
2049 );
2050
2051 assert_eq!(
2053 deltas[0].flags & RecordFlag::F_LAST.value(),
2054 0,
2055 "CLEAR at index 0 should not have F_LAST flag"
2056 );
2057 assert_eq!(
2058 deltas[6].flags & RecordFlag::F_LAST.value(),
2059 0,
2060 "CLEAR at index 6 should not have F_LAST flag"
2061 );
2062 }
2063
2064 #[rstest]
2065 fn test_stream_deltas_chunk_size_respects_clear() {
2066 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2070binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2071binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2072
2073 let temp_file = std::env::temp_dir().join("test_stream_chunk_size_clear.csv");
2074 std::fs::write(&temp_file, csv_data).unwrap();
2075
2076 let stream = stream_deltas(&temp_file, 1, Some(1), Some(1), None, None).unwrap();
2078 let chunks: Vec<_> = stream.collect();
2079
2080 assert_eq!(chunks.len(), 3, "Expected 3 chunks with chunk_size=1");
2082 assert_eq!(chunks[0].as_ref().unwrap().len(), 1);
2083 assert_eq!(chunks[1].as_ref().unwrap().len(), 1);
2084 assert_eq!(chunks[2].as_ref().unwrap().len(), 1);
2085
2086 assert_eq!(chunks[0].as_ref().unwrap()[0].action, BookAction::Clear);
2088 assert_eq!(chunks[1].as_ref().unwrap()[0].action, BookAction::Add);
2090 assert_eq!(chunks[2].as_ref().unwrap()[0].action, BookAction::Add);
2091
2092 std::fs::remove_file(&temp_file).ok();
2093 }
2094
2095 #[rstest]
2096 fn test_stream_deltas_limit_stops_at_clear() {
2097 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2099binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2100binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2101
2102 let temp_file = std::env::temp_dir().join("test_stream_limit_stops_at_clear.csv");
2103 std::fs::write(&temp_file, csv_data).unwrap();
2104
2105 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(1)).unwrap();
2107 let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2108
2109 assert_eq!(all_deltas.len(), 1);
2110 assert_eq!(all_deltas[0].action, BookAction::Clear);
2111
2112 std::fs::remove_file(&temp_file).ok();
2113 }
2114
2115 #[rstest]
2116 fn test_stream_deltas_limit_includes_clear() {
2117 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2119binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2120binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2121binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2122binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2123binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2124
2125 let temp_file = std::env::temp_dir().join("test_stream_limit_includes_clear.csv");
2126 std::fs::write(&temp_file, csv_data).unwrap();
2127
2128 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(4)).unwrap();
2130 let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2131
2132 assert_eq!(all_deltas.len(), 4);
2133 assert_eq!(all_deltas[0].action, BookAction::Clear);
2134 assert_eq!(all_deltas[1].action, BookAction::Add);
2135 assert_eq!(all_deltas[2].action, BookAction::Add);
2136 assert_eq!(all_deltas[3].action, BookAction::Update);
2137
2138 std::fs::remove_file(&temp_file).ok();
2139 }
2140
2141 #[rstest]
2142 fn test_stream_deltas_limit_sets_f_last() {
2143 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2145binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2146binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2147binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2148binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2149binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2150
2151 let temp_file = std::env::temp_dir().join("test_stream_limit_f_last.csv");
2152 std::fs::write(&temp_file, csv_data).unwrap();
2153
2154 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(3)).unwrap();
2156 let chunks: Vec<_> = stream.collect();
2157
2158 assert_eq!(chunks.len(), 1);
2160 let deltas = chunks[0].as_ref().unwrap();
2161 assert_eq!(deltas.len(), 3);
2162
2163 assert_eq!(
2165 deltas[2].flags & RecordFlag::F_LAST.value(),
2166 RecordFlag::F_LAST.value(),
2167 "Final delta should have F_LAST flag when limit is reached"
2168 );
2169
2170 std::fs::remove_file(&temp_file).ok();
2171 }
2172
2173 #[rstest]
2174 fn test_stream_deltas_chunk_boundary_no_f_last() {
2175 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2177binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2178binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,ask,50001.0,2.0
2179binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,49999.0,0.5";
2180
2181 let temp_file = std::env::temp_dir().join("test_stream_chunk_no_f_last.csv");
2182 std::fs::write(&temp_file, csv_data).unwrap();
2183
2184 let mut stream = stream_deltas(&temp_file, 2, Some(1), Some(1), None, None).unwrap();
2186
2187 let chunk1 = stream.next().unwrap().unwrap();
2188 assert_eq!(chunk1.len(), 2);
2189
2190 assert_eq!(
2192 chunk1[1].flags & RecordFlag::F_LAST.value(),
2193 0,
2194 "Mid-stream chunk should not have F_LAST flag"
2195 );
2196
2197 let chunk2 = stream.next().unwrap().unwrap();
2199 assert_eq!(chunk2.len(), 1);
2200 assert_eq!(
2201 chunk2[0].flags & RecordFlag::F_LAST.value(),
2202 RecordFlag::F_LAST.value(),
2203 "Final chunk at EOF should have F_LAST flag"
2204 );
2205
2206 std::fs::remove_file(&temp_file).ok();
2207 }
2208}